package es.ucm.fdi.sscheck.spark.demo
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.dstream.DStream
import org.junit.runner.RunWith
import org.scalacheck.Gen.const
import org.specs2.ScalaCheck
import org.specs2.Specification
import org.specs2.matcher.ResultMatchers
import org.specs2.runner.JUnitRunner
import es.ucm.fdi.sscheck.gen.BatchGen
import es.ucm.fdi.sscheck.prop.tl.DStreamProp
import es.ucm.fdi.sscheck.prop.tl.Formula
import es.ucm.fdi.sscheck.prop.tl.Formula.always
import es.ucm.fdi.sscheck.prop.tl.Formula.intToTimeout
import es.ucm.fdi.sscheck.prop.tl.Formula.resultFunToNow
import es.ucm.fdi.sscheck.spark.streaming.SharedStreamingContextBeforeAfterEach
@RunWith(classOf[JUnitRunner])
class NPEAlwaysBug
extends Specification
with SharedStreamingContextBeforeAfterEach
with ResultMatchers
// with Serializable
with ScalaCheck {
// Spark configuration
override def sparkMaster : String = "local[*]"
override def batchDuration = Duration(500)
override def defaultParallelism = 3
// this is needed to reproduce the bug
override def enableCheckpointing = true
def is =
sequential ^ s2"""
- where $p1
"""
// - where ${p1 must beFailing}
def genNPE = BatchGen.always(BatchGen.ofN(1, 0))
def genOk = BatchGen.always(BatchGen.ofN(1, 0), 5)
def p1 = {
type U = (RDD[Int], RDD[Int])
val hashtagBatch = (_ : U)._2
val formula : Formula[U] = always { u : U =>
true
} during 10
DStreamProp.forAll(
genNPE)( // NullPointerException, see below
//genOk)( // workd ok
identity[DStream[Int]])(
formula)
}.set(minTestsOk = 2).verbose
/*
java.lang.NullPointerException
at es.ucm.fdi.sscheck.prop.tl.DStreamProp$$anonfun$forAll$4.apply(DStreamProp.scala:178)
at es.ucm.fdi.sscheck.prop.tl.DStreamProp$$anonfun$forAll$4.apply(DStreamProp.scala:131)
at org.scalacheck.Prop$$anonfun$forAllNoShrink$1$$anonfun$2.apply(Prop.scala:516)
at org.scalacheck.Prop$.secure(Prop.scala:458)
at org.scalacheck.Prop$$anonfun$forAllNoShrink$1.apply(Prop.scala:516)
at org.scalacheck.Prop$$anonfun$forAllNoShrink$1.apply(Prop.scala:511)
at org.scalacheck.Prop$$anonfun$apply$5.apply(Prop.scala:293)
at org.scalacheck.Prop$$anonfun$apply$5.apply(Prop.scala:292)
at org.scalacheck.PropFromFun.apply(Prop.scala:21)
at org.scalacheck.Test$.org$scalacheck$Test$$workerFun$1(Test.scala:321)
at org.scalacheck.Test$$anonfun$2.apply(Test.scala:350)
at org.scalacheck.Test$$anonfun$2.apply(Test.scala:350)
at org.scalacheck.Platform$.runWorkers(Platform.scala:40)
at org.scalacheck.Test$.check(Test.scala:350)
at es.ucm.fdi.sscheck.spark.demo.NPEAlwaysBug.check(NPEAlwaysBug.scala:23)
The problem is that genNPE is defined as BatchGen.always(BatchGen.ofN(1, 0)) which
is not a sized generator. Hence ScalaCheck starts from Gen.resize(0, genNPE) that
generates PDStream(), and so the for (i <- 1 to testCaseDstream.length if (! propFailed))
in the body of Prop.forAllNoShrink inside DStreamProp.forAll doesn't make any iteration
because testCaseDstream is an empty prefix with no batches, and so there is no
wait on to onBatchCompletedSyncVar.take(batchCompletionTimeout). As a consequence
inputDStream1.foreachRDD { (input1Batch, time) => ...} above is never executed
and currFormula is never assigned formulaNext, so when
val testCaseResult = currFormula.result.getOrElse(Prop.Undecided)
is executed we get a NPE because currFormula is null.
Possible fixes:
- replace val testCaseResult = currFormula.result.getOrElse(Prop.Undecided) by a code
that considers the case when testCaseResult is null
- initialize currFormula to formulaNext instead of to a null. This looks like a
good idea anyway
* **/
}