juanrh / sscheck

ScalaCheck for Spark
Apache License 2.0
63 stars 9 forks source link

"ERROR JobScheduler: Error generating jobs" on ScalaCheck tests for Streaming with low resources #20

Closed juanrh closed 8 years ago

juanrh commented 9 years ago

When the machine is saturated errors like the following appear in the log, although sometimes the test is successful anyway

15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time 1440580922500 ms java.lang.NullPointerException

This can be reproduced in my mac in ScalaCheckStreamingTest using

def meanProp(testSubject : DStream[Double] => DStream[Long]) = 
    DStreamProp.forAllAlways(
       Gen.listOfN(100,  Gen.listOfN(30000, arbitrary[Double])))( 
        /* that reproduces the "Error generating jobs for time 1440538987900 ms java.lang.NullPointerException"
        * which seems to appear when the system is saturated. Anyway add Try to the take
        * of the sync var and fail with meaningful exception, and protect the slice of the
        * derived DStream
        * */ 
      // Gen.listOfN(10,  Gen.listOfN(30, arbitrary[Double])))(
      testSubject)( 
      (inputBatch : RDD[Double], transBatch : RDD[Long]) => {
        transBatch.count === 1 and
        inputBatch.count === transBatch.first
      }).set(workers = 1, minTestsOk = 10).verbose   

This error can be removed by modifying compute() at DynSeqQueueInputDStream so it return the following when the batch is empty, instead of None

else {
  ssc.scheduler.inputInfoTracker.reportInfo(validTime, InputInfo(id, 0))
  Some(_sc.emptyRDD[A])
}

but that leads to mixed batches, which break the test cases.

juanrh commented 9 years ago

Some fragment of the log for this error

15/08/26 11:22:02 WARN DStreamProp: finished test case 9 with result success
15/08/26 11:22:02 WARN SharedStreamingContext: stopping test Spark Streaming context
15/08/26 11:22:02 WARN SharedStreamingContext: creating test Spark Streaming context
15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time 1440580922500 ms
java.lang.NullPointerException
    at org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
    at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time 1440580922600 ms
java.lang.NullPointerException
    at org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
    at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time 1440580922700 ms
java.lang.NullPointerException
``
juanrh commented 9 years ago

If nothing works, a new approach would be using a ConstantInputDStream to generate always a RDD with a single 0, acting as a "tick" DStream, to which we would apply transform taking the ideas of http://mail-archives.us.apache.org/mod_mbox/spark-user/201403.mbox/%3CCAMwrk0=ZNRFuPGfpD+mdyHX3wM8fEODtNTYY3EmjH2B2c2GuuA@mail.gmail.com%3E to the extreme, add the parallelization of the corresponding batch in a way similar to what is done in DynSeqQueueInputDStream, maintaining a list that is mutated like in that class. With that approach we would lose the stats in the number of input records, which is not nice, this is a last resort only

juanrh commented 8 years ago

Since release sscheck 0.2.1-SNAPSHOT DynSeqQueueInputDStream is replaced by TestInputStream from spark-testing-base, which doesn't have these problems

isabelbongo commented 5 years ago

ERROR spark.SparkContext: Error initializing SparkContext. java.lang.IllegalArgumentException: Required executor memory (12288+1228 MB) is above the max threshold (12288 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/o$ at org.apache.spark.deploy.yarn.Client.verifyClusterResources(Client.scala:302) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:166)