holdenk / spark-testing-base

Base classes to use when writing tests with Spark
Apache License 2.0
1.52k stars 357 forks source link

Add support for quickcheck DStreams #15

Open holdenk opened 9 years ago

holdenk commented 9 years ago

Add support for quickcheck DStreams.

mahmoudhanafy commented 8 years ago

I tried to implement DStream Generator, but I stuck into a problem. I created a StreamingContext instance and used it to generate InputDStreams. Consider the following code:

val ssc: StreamingContext = new StreamingContext(sc, Seconds(1))
val generator: Gen[InputDStream[Int]] = DStreamGenerator.genInputDStream[Int](ssc)(Arbitrary.arbitrary[Int])

val property =
  forAll(generator) {
    stream => {
      stream.saveAsTextFiles("/tmp/hanafy/")
      ssc.start()
      ssc.awaitTerminationOrTimeout(10000)
      true
    }
  }

Only the first stream is saved and the got the following exception:

Caused by: java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported

Only the first InputStream is added before starting the streaming context started, So I tried to close the StreamingContext after saving the stream.

forAll(generator) {
  stream => {
    stream.saveAsTextFiles("/tmp/hanafy/")
    ssc.start()
    ssc.awaitTerminationOrTimeout(10000)
    ssc.stop(stopSparkContext = false)

    true
  }
}

But I got the following exception:

Caused by: java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported

How to deal with this ?

mahmoudhanafy commented 8 years ago

Should I create a new StreamingContext at every stream ?