amplab / graphx

Former GraphX development repository. GraphX has been merged into Apache Spark; please submit pull requests there.
https://github.com/apache/spark
Apache License 2.0
356 stars 103 forks source link

PageRank causes java.util.NoSuchElementException #52

Open dcrankshaw opened 10 years ago

dcrankshaw commented 10 years ago

When running PageRank on a cluster, sometimes I hit a NoSuchElementException that's caused somewhere in VertexSetRDD. Full stack trace and command below. The line numbers may be slightly off due to debugging printlns.

Command:

/root/graphx/run-example org.apache.spark.graph.Analytics spark://ec2-54-224-159-106.compute-1.amazonaws.com:7077 pagerank hdfs://ec2-54-224-159-106.compute-1.amazonaws.com:9000/soc-LiveJournal1.txt --numIter=10 --numEPart=128

Stack Trace:

java.util.NoSuchElementException: End of stream
    at org.apache.spark.util.NextIterator.next(NextIterator.scala:83)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29)
    at org.apache.spark.graph.VertexSetRDD$$anonfun$8.apply(VertexSetRDD.scala:314)
    at org.apache.spark.graph.VertexSetRDD$$anonfun$8.apply(VertexSetRDD.scala:313)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedRDD.compute(ZippedRDD.scala:64)
    at org.apache.spark.graph.VertexSetRDD.compute(VertexSetRDD.scala:149)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:159)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
dcrankshaw commented 10 years ago

Interestingly, https://github.com/amplab/graphx/pull/51 seems to fix the issue.

rxin commented 10 years ago

simple code is less buggy :)

jegonzal commented 10 years ago

Hmm, is that error in the master branch? Those line numbers don't seem to "line-up." I guess for now maybe it makes sense to merge #51 since it is simpler.

ccsevers commented 10 years ago

I'm running into this issue in the latest version built off of #132

The code I'm running is pretty simple:

    val input = sc.sequenceFile[VectorWritable, VectorWritable](inputPath, classOf[VectorWritable], classOf[VectorWritable])
// not even parsing the vectors, just making some big graph
val edges = input.flatMap {
      case (vec1, vec2) =>
        Seq(Edge(Random.nextLong, Random.nextLong, 1), Edge(Random.nextLong(),Random.nextLong(), 1))
    }
 val g = Graph.fromEdges(edges, 1)
 val cc = ConnectedComponents.run(g)
 cc.vertices.count()

The error I see is this: 4/01/09 10:38:46 WARN scheduler.TaskSetManager: Lost TID 400 (task 4.0:0) 14/01/09 10:38:46 WARN scheduler.TaskSetManager: Loss was due to java.util.NoSuchElementException java.util.NoSuchElementException: End of stream at org.apache.spark.util.NextIterator.next(NextIterator.scala:83) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29) at org.apache.spark.graph.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:34) at org.apache.spark.graph.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:33) at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:449) at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:449) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:242) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662)

This is running on a Hadoop 2.2.0 cluster with YARN 2.2.0. I've previously run against the same data set with Bagel and it works great.

ccsevers commented 10 years ago

Just to make it simpler you can change the input to something like: val input = sc.parallelize(1 to 100000000, 400) and change the flatMap input appropriately and see the same error.

edit: Also, oddly I can run it with say 11 splits on 10 works and it goes through. I see the same End of Stream error in the logs but it just fails that tasks and keeps going.

ankurdave commented 10 years ago

Thanks for the report. I haven't yet been able to reproduce this. How many cores does Spark have in your configuration?

ccsevers commented 10 years ago

@ankurdave I've tried it with 4-8 cores. I can try with 1 if you think it would help pin down what's going on.

edit: Just to be clear, I mean 10-20 worker nodes with 4-8 cores each. (the machines have 24 cores each though).