package es.ucm.fdi.sscheck.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.dstream.DStream
import org.junit.runner.RunWith
import org.scalacheck.Gen.const
import org.specs2.ScalaCheck
import org.specs2.Specification
import org.specs2.matcher.ResultMatchers
import org.specs2.runner.JUnitRunner
import es.ucm.fdi.sscheck.gen.BatchGen
import es.ucm.fdi.sscheck.prop.tl.DStreamProp
import es.ucm.fdi.sscheck.prop.tl.Formula
import es.ucm.fdi.sscheck.prop.tl.Formula.always
import es.ucm.fdi.sscheck.prop.tl.Formula.intToTimeout
import es.ucm.fdi.sscheck.prop.tl.Formula.resultFunToNow
import es.ucm.fdi.sscheck.spark.streaming.SharedStreamingContextBeforeAfterEach
@RunWith(classOf[JUnitRunner])
class ValGenBug
extends Specification
with SharedStreamingContextBeforeAfterEach
with ResultMatchers
with Serializable
with ScalaCheck {
// Spark configuration
override def sparkMaster : String = "local[*]"
override def batchDuration = Duration(500)
override def defaultParallelism = 3
// this is needed to reproduce the bug
override def enableCheckpointing = true
def is =
sequential ^ s2"""
- where $p1
"""
val genNPE = BatchGen.always(BatchGen.ofN(1, 0)) // NotSerializableException, see below
//def genNPE = BatchGen.always(BatchGen.ofN(1, 0)) // works ok
def genOk = BatchGen.always(BatchGen.ofN(1, 0), 5)
def p1 = {
type U = (RDD[Int], RDD[Int])
val hashtagBatch = (_ : U)._2
val formula : Formula[U] = always { u : U =>
true
} during 10
DStreamProp.forAll(
genOk)(
identity[DStream[Int]])(
formula)
}.set(minTestsOk = 2).verbose
/*
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.scalacheck.Gen$$anon$6
Serialization stack:
- object not serializable (class: org.scalacheck.Gen$$anon$6, value: org.scalacheck.Gen$$anon$6@1f3b992)
- field (class: es.ucm.fdi.sscheck.spark.ValGenBug, name: genNPE, type: interface org.scalacheck.Gen)
- object (class es.ucm.fdi.sscheck.spark.ValGenBug, es.ucm.fdi.sscheck.spark.ValGenBug@52b06bef)
- field (class: es.ucm.fdi.sscheck.spark.ValGenBug$$anonfun$3, name: $outer, type: class es.ucm.fdi.sscheck.spark.ValGenBug)
- object (class es.ucm.fdi.sscheck.spark.ValGenBug$$anonfun$3, <function1>)
- field (class: es.ucm.fdi.sscheck.prop.tl.Now$$anonfun$apply$1, name: evidence$3$1, type: interface scala.Function1)
- object (class es.ucm.fdi.sscheck.prop.tl.Now$$anonfun$apply$1, <function1>)
- field (class: es.ucm.fdi.sscheck.prop.tl.Now, name: p, type: interface scala.Function1)
- object (class es.ucm.fdi.sscheck.prop.tl.Now, Now(<function1>))
- writeObject data (class: scala.collection.immutable.$colon$colon)
- object (class scala.collection.immutable.$colon$colon, List(Now(<function1>), Next(Now(<function1>)), Next(Next(Now(<function1>))), Next(Next(Next(Now(<function1>)))), Next(Next(Next(Next(Now(<function1>))))), Next(Next(Next(Next(Next(Now(<function1>)))))), Next(Next(Next(Next(Next(Next(Now(<function1>))))))), Next(Next(Next(Next(Next(Next(Next(Now(<function1>)))))))), Next(Next(Next(Next(Next(Next(Next(Next(Now(<function1>))))))))), Next(Next(Next(Next(Next(Next(Next(Next(Next(Now(<function1>))))))))))))
- field (class: es.ucm.fdi.sscheck.prop.tl.And, name: phis, type: interface scala.collection.Seq)
- object (class es.ucm.fdi.sscheck.prop.tl.NextAnd, And(List(Now(<function1>), Next(Now(<function1>)), Next(Next(Now(<function1>))), Next(Next(Next(Now(<function1>)))), Next(Next(Next(Next(Now(<function1>))))), Next(Next(Next(Next(Next(Now(<function1>)))))), Next(Next(Next(Next(Next(Next(Now(<function1>))))))), Next(Next(Next(Next(Next(Next(Next(Now(<function1>)))))))), Next(Next(Next(Next(Next(Next(Next(Next(Now(<function1>))))))))), Next(Next(Next(Next(Next(Next(Next(Next(Next(Now(<function1>)))))))))))))
- field (class: es.ucm.fdi.sscheck.prop.tl.DStreamProp$$anonfun$forAll$3, name: formulaNext$1, type: interface es.ucm.fdi.sscheck.prop.tl.NextFormula)
- object (class es.ucm.fdi.sscheck.prop.tl.DStreamProp$$anonfun$forAll$3, <function2>)
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@554c4eaa)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@7e7fe6d)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@31e76a8d)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 16)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@31e76a8d, org.apache.spark.streaming.dstream.ForEachDStream@7e7fe6d, org.apache.spark.streaming.dstream.ForEachDStream@554c4eaa))
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.DynSingleSeqQueueInputDStream, org.apache.spark.streaming.dstream.DynSingleSeqQueueInputDStream@b112b13)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 16)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.DynSingleSeqQueueInputDStream@b112b13))
- writeObject data (class: org.apache.spark.streaming.DStreamGraph)
- object (class org.apache.spark.streaming.DStreamGraph, org.apache.spark.streaming.DStreamGraph@67de7a99)
- field (class: org.apache.spark.streaming.Checkpoint, name: graph, type: class org.apache.spark.streaming.DStreamGraph)
- object (class org.apache.spark.streaming.Checkpoint, org.apache.spark.streaming.Checkpoint@5114b7c7)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:558)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:595)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:594)
at es.ucm.fdi.sscheck.prop.tl.DStreamProp$.forAll(DStreamProp.scala:107)
at es.ucm.fdi.sscheck.spark.ValGenBug.p1(ValGenBug.scala:56)
at es.ucm.fdi.sscheck.spark.ValGenBug$$anonfun$is$1$$anonfun$apply$1.apply(ValGenBug.scala:39)
at es.ucm.fdi.sscheck.spark.ValGenBug$$anonfun$is$1$$anonfun$apply$1.apply(ValGenBug.scala:39)
* */
}
Note this only happens when enableCheckpointing = true. This is due to PDStreamGen.always returning a Gen object, that is not Serializable. This is more an issue of Spark, and using a def is a workaround, so this will be a non fix
other option is using @transient val genNPE, which works fine because in the implementation of DStreamProp.forAll generators are only used in the driver, and so they are not deserialized
Note this only happens when
enableCheckpointing = true
. This is due toPDStreamGen.always
returning a Gen object, that is not Serializable. This is more an issue of Spark, and using adef
is a workaround, so this will be a non fix