I've been struggling with some flaky tests that rely on reading stdout returned from a NonBlockingProcess actor. Essentially, my tests sometimes time out waiting for the output, even though I can see it definitely makes it as far as the nuprocess onStdout handler function.
Indeed, I have actually seen NonBlockingProcessSpec occasionally fail with a similar looking error. In fact the CI build for the PR I contributed recently happened to fail with it :
[info] NonBlockingProcessSpec:
[info] A NonBlockingProcess
[info] - should read from stdin and write to stdout *** FAILED ***
[info] java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg:
[info] at scala.Predef$.assert(Predef.scala:170)
[info] at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:404)
[info] at akka.testkit.TestKit.expectMsgPF(TestKit.scala:828)
[info] at akka.contrib.process.NonBlockingProcessSpec$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(NonBlockingProcessSpec.scala:40)
[info] at akka.contrib.process.NonBlockingProcessSpec$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(NonBlockingProcessSpec.scala:29)
[info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078)
[info] ...
[info] - should allow a blocking process that is blocked to be destroyed
[info] - should allow a blocking process that is blocked to be stopped
[info] - should be able to create the reference.conf specified limit of processes
[info] - should detect when a process has exited while having orphaned children that live on
[info] - should close stdin when input stream terminates
Now, my theory is that the stdioMaterializer is being shutdown before the client (parent of the NonBlockingProcess actor) can consume the data. As although there is logic to only shutdown when nuprocess has deemed the stream closed, there is still no guarantee that the client has fully consumed the stdout/stderr Source's.
I admit I don't fully understand what effect shutting down the materializer has on the SourceQueue/Source returned by materializing Source.queue - but the docs suggest it shuts down all underlying actors and abruptly stops queue processing, so it sounds like a potential issue.
I have already written a patch that replaces stdioMaterializer with a implicit parameter on the actor, meaning it is the clients responsibility to create the materializer (and handle its life cycle accordingly) - happy to submit a PR if you are happy, it also cleans up the code nicely.
I've been struggling with some flaky tests that rely on reading stdout returned from a NonBlockingProcess actor. Essentially, my tests sometimes time out waiting for the output, even though I can see it definitely makes it as far as the nuprocess
onStdout
handler function.Indeed, I have actually seen
NonBlockingProcessSpec
occasionally fail with a similar looking error. In fact the CI build for the PR I contributed recently happened to fail with it :Now, my theory is that the stdioMaterializer is being shutdown before the client (parent of the NonBlockingProcess actor) can consume the data. As although there is logic to only shutdown when nuprocess has deemed the stream closed, there is still no guarantee that the client has fully consumed the stdout/stderr Source's.
I admit I don't fully understand what effect shutting down the materializer has on the SourceQueue/Source returned by materializing Source.queue - but the docs suggest it shuts down all underlying actors and abruptly stops queue processing, so it sounds like a potential issue.
I have already written a patch that replaces stdioMaterializer with a implicit parameter on the actor, meaning it is the clients responsibility to create the materializer (and handle its life cycle accordingly) - happy to submit a PR if you are happy, it also cleans up the code nicely.