ooyala / spark-jobserver

REST job server for Spark. Note that this is *not* the mainline open source version. For that, go to https://github.com/spark-jobserver/spark-jobserver. This fork now serves as a semi-private repo for Ooyala.
Other
344 stars 134 forks source link

SparkContext shut down if runJob() returns a stream runtime object #45

Open adrianfr opened 10 years ago

adrianfr commented 10 years ago

If the runJob() method returns a Stream instead of a materialized Seq, the job is executed, but then it is "cancelled because SparkContext was shut down".

Here is a code sample:

import com.typesafe.config.Config
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import spark.jobserver._

object StreamBugJob extends SparkJob with NamedRddSupport {

  override def validate(sc:SparkContext, config: Config): SparkJobValidation = SparkJobValid

  override def runJob(sc:SparkContext, config: Config): Any = {
    val instants = List(1,2,3,4,5,6)
    val intervals = instants.sliding(2).toSeq    // Must call toList() to force the iterator. toSeq will not force it!!

    val numEvents = config.getInt("numEvents")
    val intRDD: RDD[Int] = sc.makeRDD(1 to numEvents)

    val userRdd: Seq[RDD[Int]] = intervals.map(_ => intRDD)

    val counts = userRdd.map(_.count())
    counts
  }

}

The exception occurs if numEvents is 2000000 or more ; with 1000000 or less there is no exception. (on a standalone Spark) This threshold may vary depending on your configuration.

Note that sliding(2) returns an iterator, and toSeq makes it a Stream. If toSeq is replaced with toList, the iterator is forced, and there is no exception, all is well. The docs say that a Seq return type is converted to Json, but there may be an issue with a Stream - (which is a subclass of Seq).

The exception is below:

org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:640) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:639) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1237) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.run(Mailbox.scala:218) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [2014-07-31 13:49:00,537] INFO he.spark.executor.Executor [] [akka://JobServer/user/context-supervisor/4704bfdd-com.demo.StreamBugJob] - Serialized size of result for 6 is 597 [2014-07-31 13:49:00,537] INFO he.spark.executor.Executor [] [akka://JobServer/user/context-supervisor/4704bfdd-com.demo.StreamBugJob] - Sending result for 6 directly to driver [2014-07-31 13:49:00,537] INFO he.spark.executor.Executor [] [akka://JobServer/user/context-supervisor/4704bfdd-com.demo.StreamBugJob] - Finished task ID 6 [2014-07-31 13:49:00,537] ERROR cheduler.TaskSchedulerImpl [] [akka://JobServer/user/context-supervisor/4704bfdd-com.demo.StreamBugJob] - Exception in statusUpdate java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler.TaskResultGetter$$anon$2@458eb02b rejected from java.util.concurrent.ThreadPoolExecutor@1acffaba[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 4] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372) at org.apache.spark.scheduler.TaskResultGetter.enqueueSuccessfulTask(TaskResultGetter.scala:45) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$liftedTree2$1$1.apply(TaskSchedulerImpl.scala:285) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$liftedTree2$1$1.apply(TaskSchedulerImpl.scala:282) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.TaskSchedulerImpl.liftedTree2$1(TaskSchedulerImpl.scala:282) at org.apache.spark.scheduler.TaskSchedulerImpl.statusUpdate(TaskSchedulerImpl.scala:267) at org.apache.spark.scheduler.local.LocalActor$$anonfun$receive$1.applyOrElse(LocalBackend.scala:58) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [2014-07-31 13:49:00,549] INFO he.spark.executor.Executor [] [akka://JobServer/user/context-supervisor/4704bfdd-com.demo.StreamBugJob] - Serialized size of result for 4 is 597