vesoft-inc / nebula-algorithm

Nebula-Algorithm is a Spark Application based on GraphX, which enables state of art Graph Algorithms to run on top of NebulaGraph and write back results to NebulaGraph.
71 stars 39 forks source link

louvain 执行报错 java.lang.ArrayIndexOutOfBoundsException: -1 #5

Closed stormrise closed 2 years ago

stormrise commented 2 years ago

使用的是hive的数据,spark submit,nebula-algorithm:2.5-snapshot

使用小批量数据跑的时候成功了,换成总的数据进去跑报错,

以下是节选的log

+---------+----+---+
| src| dst|cnt|
+---------+----+---+
| 28196091|1438| 2|
|126323223|1755| 1|
|143460727|1839| 1|
| 3162450|1983| 2|
|150438365|2295| 1|
|147848206|2620| 1|
|127127870|3346| 1|
|148098254|3346| 1|
|126247907|3346| 2|
| 33395919|3535| 19|
|147839265|4127| 1|
|123199945|4372| 3|
|143852884|4538| 4|
| 9195663|4545| 2|
| 9309219|5639| 2|
| 16596782|5704| 1|
|128692789|5756| 1|
|131615056|5845| 1|
| 34714949|5845| 2|
| 9577564|5845| 8|
+---------+----+---+
only showing top 20 rows

root
|-- src: integer (nullable = true)
|-- dst: integer (nullable = true)
|-- cnt: integer (nullable = true)

......

21/11/02 20:29:19 INFO spark.SparkContext: Created broadcast 19 from broadcast at LouvainAlgo.scala:68
21/11/02 20:29:19 INFO lib.LouvainAlgo$: ============================== step 1 =======================
21/11/02 20:29:19 INFO spark.SparkContext: Starting job: count at LouvainAlgo.scala:111
21/11/02 20:29:19 INFO scheduler.DAGScheduler: Registering RDD 113 (mapPartitions at GraphImpl.scala:208) as input to shuffle 13
21/11/02 20:29:19 INFO scheduler.DAGScheduler: Registering RDD 122 (mapPartitions at VertexRDDImpl.scala:247) as input to shuffle 12
21/11/02 20:29:19 INFO scheduler.DAGScheduler: Registering RDD 127 (flatMap at LouvainAlgo.scala:210) as input to shuffle 15
21/11/02 20:29:19 INFO scheduler.DAGScheduler: Registering RDD 130 (flatMap at LouvainAlgo.scala:251) as input to shuffle 14
21/11/02 20:29:19 INFO scheduler.DAGScheduler: Got job 4 (count at LouvainAlgo.scala:111) with 200 output partitions
21/11/02 20:29:19 INFO scheduler.DAGScheduler: Final stage: ResultStage 39 (count at LouvainAlgo.scala:111)

......

21/11/02 20:36:25 INFO cluster.YarnScheduler: Removed TaskSet 55.0, whose tasks have all completed, from pool
21/11/02 20:36:25 INFO scheduler.DAGScheduler: ShuffleMapStage 55 (mapPartitions at VertexRDDImpl.scala:247) finished in 11.880 s
21/11/02 20:36:25 INFO scheduler.DAGScheduler: looking for newly runnable stages
21/11/02 20:36:25 INFO scheduler.DAGScheduler: running: Set()
21/11/02 20:36:25 INFO scheduler.DAGScheduler: waiting: Set(ShuffleMapStage 56, ResultStage 57)
<font style="color:red">21/11/02 20:36:25 INFO scheduler.DAGScheduler: failed: Set()</font>

21/11/02 20:36:25 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 56 (GraphImpl.aggregateMessages - preAgg MapPartitionsRDD[165] at mapPartitions at GraphImpl.scala:208), which has no missing parents
21/11/02 20:36:25 INFO memory.MemoryStore: Block broadcast_28 stored as values in memory (estimated size 80.6 KB, free 1184.3 MB)
21/11/02 20:36:25 INFO memory.MemoryStore: Block broadcast_28_piece0 stored as bytes in memory (estimated size 33.2 KB, free 1184.3 MB)
21/11/02 20:36:25 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 in memory on VMS136198:26469 (size: 33.2 KB, free: 1185.4 MB)
21/11/02 20:36:25 INFO spark.SparkContext: Created broadcast 28 from broadcast at DAGScheduler.scala:1163
21/11/02 20:36:25 INFO scheduler.DAGScheduler: Submitting 200 missing tasks from ShuffleMapStage 56 (GraphImpl.aggregateMessages - preAgg MapPartitionsRDD[165] at mapPartitions at GraphImpl.scala:208) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
21/11/02 20:36:25 INFO cluster.YarnScheduler: Adding task set 56.0 with 200 tasks

......

21/11/02 20:36:34 WARN scheduler.TaskSetManager: Lost task 56.0 in stage 56.0 (TID 3387, , executor 20): java.lang.ArrayIndexOutOfBoundsException: -1
at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$mcJI$sp.apply$mcJI$sp(GraphXPrimitiveKeyOpenHashMap.scala:64)
at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:71)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

关于这个报错,jira上有个 https://issues.apache.org/jira/browse/SPARK-5480