Aync systems gain performance by performing switching to another tasks when the former task gets blocked waiting for IO. In the implementation of DStreamProp.forAllAlways we have a huge IO block which is waiting for the next batch to be completed. In local mode it is still the local machine which is working, but if sparkMaster was pointing to a cluster then the driver process would be just waiting. On the other hand, when the new batch is completed the driver suddenly will have a lot of work in the corresponding foreachRDD, updating the result of the property to account for the new batch. When DStreamProp.forAll is introduced in the future, that update work will be often not trivial, if the formula gets complicated. We can fix this, by considering that the current design of DStreamProp.forAllAlways started from the prejudice that we'll be sending data from Prop.forAll after each batch interval, because that is what we had in previous attempts based on actor receivers. We don't need to do that anymore, but otherwise we could:
just like in the current implementation of DStreamProp.forAll, each scalatest test case is sent completely to a DynSeqQueueInputDStream at the beginning of the test. But now, instead of waiting for as much batches as the size of the test, we create a formula object for the test case, and wait for it to be resolved.
the formula stores the test case so we can fail with the corresponding message. The downside of that is that we definitively lose shrinking, as mentioned in #19. So another option is just blocking in the execution of each test case until the corresponding formula is resolved. In fact this way we don't need the formula to store the test case, as that is already handled by the ScalaCheck runtime: that sounds like a better idea
we can still check each batch completion for a timeout to ensure the batches are not too slow, but that is orthogonal, and even should be performed independently from the test case execution: we could have some tasks executing for the test cases, and other independent task just checking that the actual batch completion delay doesn't derive too much from the batch interval. If this task detects a failure that it might signal it to all the formulas so all of them fail with a timeout exception, so whichever test case wins in the ScalaCheck runtime we know it fails with a timeout exception
formulas can be evaluated asynchronously at the driver, so at each foreachRDD the driver just passes the new batch to the formula so it can update itself. Actors look like a good fit for this lightweight updateable task with an independent state. WARNING: actions on RDDs for a batch might be launched from a formula after some of the RDDs have already been garbage collected, if the update of the formula is too slow. We could use StreamingContext.remember to increase the memory of DStreams, although that uses more memory. That could be parametrized that can be tuned depending on the resources of the machine running the test, and whether or not cluster mode is being used
I think this issue should be resolved only after #19 , and with a solution compatible with #19
Aync systems gain performance by performing switching to another tasks when the former task gets blocked waiting for IO. In the implementation of
DStreamProp.forAllAlways
we have a huge IO block which is waiting for the next batch to be completed. In local mode it is still the local machine which is working, but ifsparkMaster
was pointing to a cluster then the driver process would be just waiting. On the other hand, when the new batch is completed the driver suddenly will have a lot of work in the correspondingforeachRDD
, updating the result of the property to account for the new batch. WhenDStreamProp.forAll
is introduced in the future, that update work will be often not trivial, if the formula gets complicated. We can fix this, by considering that the current design ofDStreamProp.forAllAlways
started from the prejudice that we'll be sending data fromProp.forAll
after each batch interval, because that is what we had in previous attempts based on actor receivers. We don't need to do that anymore, but otherwise we could:DStreamProp.forAll
, each scalatest test case is sent completely to aDynSeqQueueInputDStream
at the beginning of the test. But now, instead of waiting for as much batches as the size of the test, we create a formula object for the test case, and wait for it to be resolved.foreachRDD
the driver just passes the new batch to the formula so it can update itself. Actors look like a good fit for this lightweight updateable task with an independent state. WARNING: actions on RDDs for a batch might be launched from a formula after some of the RDDs have already been garbage collected, if the update of the formula is too slow. We could useStreamingContext.remember
to increase the memory of DStreams, although that uses more memory. That could be parametrized that can be tuned depending on the resources of the machine running the test, and whether or not cluster mode is being usedI think this issue should be resolved only after #19 , and with a solution compatible with #19