Mellanox / SparkRDMA

This is archive of SparkRDMA project. The new repository with RDMA shuffle acceleration for Apache Spark is here: https://github.com/Nvidia/sparkucx
Apache License 2.0
241 stars 70 forks source link

spark rdma IBV_WC_WR_FLUSH_ERR #23

Open xluckly opened 5 years ago

xluckly commented 5 years ago

When I run Spark on yarn Spark2.1.0 Hadoop2.7.3

My spark task is correct But when my data is big,spark RdmaShuffleManager got error ,please check attch!

Spark rdma conf: spark.driver.extraClassPath /home/bigdata/local/spark-rdma-3.1-for-spark-2.1.0-jar-with-dependencies.jar spark.executor.extraClassPath /home/bigdata/local/spark-rdma-3.1-for-spark-2.1.0-jar-with-dependencies.jar spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager spark.shuffle.compress false spark.shuffle.spill.compress false spark.broadcast.compress false spark.broadcast.checksum false spark.locality.wait 0 spark.executor.extraLibraryPath /home/bigdata/local/rdma spark.driver.extraLibraryPath /home/bigdata/local/rdma

logs:

INFO DAGScheduler: Executor lost: 2 (epoch 29) 19/02/18 10:18:10 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster. 19/02/18 10:18:10 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2, jx-bd-hadoop523, 35995, None) 19/02/18 10:18:10 INFO BlockManagerMaster: Removed 2 successfully in removeExecutor 19/02/18 10:18:10 INFO DAGScheduler: Shuffle files lost for executor: 2 (epoch 29) 19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 5 is now unavailable on executor 2 (0/1, false) 19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 9 is now unavailable on executor 2 (0/1000, false) 19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 2 is now unavailable on executor 2 (0/1000, false) 19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 4 is now unavailable on executor 2 (0/2, false) 19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 1 is now unavailable on executor 2 (0/500, false) 19/02/18 10:18:12 WARN TransportChannelHandler: Exception in connection from /10.200.20.213:45426 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:748) 19/02/18 10:18:12 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 2. 19/02/18 10:18:12 ERROR YarnScheduler: Lost an executor 2 (already removed): Pending loss reason. 19/02/18 10:18:12 INFO BlockManagerMaster: Removal of executor 2 requested 19/02/18 10:18:12 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster. 19/02/18 10:18:12 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asked to remove non-existent executor 2

19/02/18 10:17:42 INFO MemoryStore: Block broadcast_28 stored as values in memory (estimated size 39.9 KB, free 7.0 GB) 19/02/18 10:17:42 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them 19/02/18 10:17:42 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@10.200.20.213:41323) 19/02/18 10:17:42 INFO MapOutputTrackerWorker: Got the output locations 19/02/18 10:17:42 INFO RdmaShuffleManager: Getting MapTaskOutput table for shuffleId: 0 of size 8192 from driver 19/02/18 10:17:42 INFO RdmaShuffleManager: RDMA read mapTaskOutput table for shuffleId: 0 took 4 ms 19/02/18 10:17:42 INFO CodeGenerator: Code generated in 19.000389 ms 19/02/18 10:17:42 INFO CodeGenerator: Code generated in 13.038783 ms 19/02/18 10:17:42 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM 19/02/18 10:17:42 INFO CodeGenerator: Code generated in 10.63309 ms 19/02/18 10:17:42 INFO DiskBlockManager: Shutdown hook called 19/02/18 10:17:42 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 12, fetching them 19/02/18 10:17:42 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@10.200.20.213:41323) 19/02/18 10:17:42 INFO ShutdownHookManager: Shutdown hook called 19/02/18 10:17:42 ERROR Executor: Exception in task 14.0 in stage 9.0 (TID 1518) java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:868) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934) at java.io.DataInputStream.read(DataInputStream.java:149) at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166) at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 19/02/18 10:17:42 ERROR Executor: Exception in task 12.0 in stage 9.0 (TID 1517) java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:868) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934) at java.io.DataInputStream.read(DataInputStream.java:149) at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166) at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 19/02/18 10:17:42 ERROR Executor: Exception in task 8.0 in stage 9.0 (TID 1513) java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:868) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934) at java.io.DataInputStream.read(DataInputStream.java:149) at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166) at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 19/02/18 10:17:42 INFO MapOutputTrackerWorker: Got the output locations 19/02/18 10:17:42 INFO RdmaShuffleManager: Getting MapTaskOutput table for shuffleId: 12 of size 16384 from driver 19/02/18 10:17:42 INFO RdmaShuffleManager: RDMA read mapTaskOutput table for shuffleId: 12 took 0 ms 19/02/18 10:17:42 INFO CoarseGrainedExecutorBackend: Got assigned task 1520 19/02/18 10:17:42 INFO Executor: Running task 1.0 in stage 3.0 (TID 1520) 19/02/18 10:17:42 INFO CoarseGrainedExecutorBackend: Got assigned task 1521 19/02/18 10:17:42 INFO Executor: Running task 2.0 in stage 3.0 (TID 1521) 19/02/18 10:17:42 INFO CoarseGrainedExecutorBackend: Got assigned task 1522 19/02/18 10:17:42 INFO Executor: Running task 3.0 in stage 3.0 (TID 1522) 19/02/18 10:17:42 INFO CodeGenerator: Code generated in 23.898935 ms

19/02/18 10:44:51 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on jx-bd-hadoop528:41859 (size: 3.8 KB, free: 7.3 GB) 19/02/18 10:44:51 INFO disni: createCompChannel, context 140379227583536 19/02/18 10:44:51 INFO disni: createCQ, objId 140369536012256, ncqe 4352 19/02/18 10:44:51 INFO disni: createQP, objId 140369536014024, send_wr size 4096, recv_wr_size 256 19/02/18 10:44:51 INFO disni: accept, id 0 19/02/18 10:44:51 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.200.20.218:45240 19/02/18 10:44:52 WARN TaskSetManager: Lost task 0.0 in stage 5.2 (TID 8, jx-bd-hadoop528 executor 37): FetchFailed(null, shuffleId=0, mapId=-1, reduceId=0, message= org.apache.spark.shuffle.MetadataFetchFailedException: RdmaShuffleNode: BlockManagerId(37, jx-bd-hadoop528, 41859, None) has no RDMA connection to BlockManagerId(21, jx-bd-hadoop528, 37576, None) at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:214) at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:203) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:203) at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:185) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

) 19/02/18 10:44:52 INFO TaskSetManager: Task 0.0 in stage 5.2 (TID 8) failed, but another instance of the task has already succeeded, so not re-queuing the task to be re-executed. 19/02/18 10:44:52 INFO DAGScheduler: Marking ResultStage 5 (processCmd at CliDriver.java:376) as failed due to a fetch failure from ShuffleMapStage 4 (processCmd at CliDriver.java:376) 19/02/18 10:44:52 INFO YarnScheduler: Removed TaskSet 5.2, whose tasks have all completed, from pool 19/02/18 10:44:52 INFO DAGScheduler: ResultStage 5 (processCmd at CliDriver.java:376) failed in 1.026 s due to org.apache.spark.shuffle.MetadataFetchFailedException: RdmaShuffleNode: BlockManagerId(37, jx-bd-hadoop528, 41859, None) has no RDMA connection to BlockManagerId(21, jx-bd-hadoop528.zeus, 37576, None) at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:214) at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:203) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:203) at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:185) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

INFO TaskSetManager: Task 134.0 in stage 13.0 (TID 2417) failed, but another instance of the task has already succeeded, so not re-queuing the task to be re-executed. WARN TaskSetManager: Lost task 33.0 in stage 13.0 (TID 2333, jx-bd-hadoop526, executor 26): FetchFailed(BlockManagerId(48, jx-bd-hadoop530, 42158, None), shuffleId=14, mapId=0, reduceId=33, message= org.apache.spark.shuffle.FetchFailedException: RDMA Send/Write/Read WR completed with error: IBV_WC_WR_FLUSH_ERR

petro-rudenko commented 5 years ago

Hi, how many physical nodes do you have, what's the hardware there (num CPUs, RAM, NiC, disk)? Seems like you come out of resources. Can you please try to run with spark.shuffle.rdma.useOdp true

xluckly commented 5 years ago

Hi, how many physical nodes do you have, what's the hardware there (num CPUs, RAM, NiC, disk)? Seems like you come out of resources. Can you please try to run with spark.shuffle.rdma.useOdp true

cpus 10
RAM 192G 10 nodes Network bandwidth 20G

xluckly commented 5 years ago

Hi, how many physical nodes do you have, what's the hardware there (num CPUs, RAM, NiC, disk)? Seems like you come out of resources. Can you please try to run with spark.shuffle.rdma.useOdp true

use this spark.shuffle.rdma.useOdp true the problem still exists

petro-rudenko commented 5 years ago

Do you have yarn logs for executors?