juanrh / sscheck

ScalaCheck for Spark
Apache License 2.0
63 stars 9 forks source link

Parallel execution of scalacheck test cases for streaming properties #19

Open juanrh opened 9 years ago

juanrh commented 9 years ago

DStreamProp.forAllAlways executes the test cases sequentially, which is ok for local tests and fine tuned batch interval. Adapting the ideas of StreamingContextDirectReceiverTest the tests maybe could be executed in parallel. But instead of using a single inputDStream that multiplexes the test cases by using pairs, which leads to not being able to use DStream transformers as test subjects, the idea is using a Vector (or other fast immutable indexed Seq) of DStreams. We need access to the number of workers as an implicit value wrapping Int (like Parallelism), and we give each worker an index in that DStream with something like:

 val counter2 = new AtomicInteger(0)
  val id = new ThreadLocal[Int] {
    override def initialValue() : Int = counter2.getAndIncrement()
  }                                               
  for (_ <- 1 to 4) {
     new Thread(new Runnable () { def run() {
         println(s"I got id ${id.get}, said ${Thread.currentThread()}")
         Thread.sleep(500)
         println(s"I told you I got id ${id.get}, said ${Thread.currentThread()}")
       }
     }).start()                                  
  }
  Thread.sleep(2000) 

This way each worker generates data for a different DStream, hopefully eliminating thread safety problems, and the action checking the result just iterates on the Vector of DStreams, in a way similar to what it is done in StreamingContextDirectReceiverTest

juanrh commented 9 years ago

The Vector of DStreams cannot be replaced by a TheadLocal variable because the driver thread needs to access each of the DStreams in the vector

juanrh commented 9 years ago

If we are already specifying the number of workers outside the prop, we could think of that not as a number of workers but as number of multiplexed test cases, and work in a single thread as in actorSendingPropMultiplex from https://github.com/juanrh/sscheck/blob/streamingDataSendExperiments/src/test/scala/es/ucm/fdi/sscheck/spark/streaming/StreamingContextActorReceiverTest.scala. This way the program is simpler and only one thread has access to the vector of DStreams, and anyway if we want to parallelize then we create the threads or its pool and have more control over them. Some care must be taken to avoid blocking the driver thread The downside of this is that this way we cannot use the shrinking mechanism of ScalaCheck, because we can fail for a test case during the execution of another, and we just change the message of the Specs2 Result so this is not noticed. So maybe it is better letting ScalaCheck hadnling the workers is a good idea anyway. Currently we don't use shrinking but it could be optionally set in the future, and it is an important feature