Mellanox / SparkRDMA

This is archive of SparkRDMA project. The new repository with RDMA shuffle acceleration for Apache Spark is here: https://github.com/Nvidia/sparkucx
Apache License 2.0
240 stars 70 forks source link

Fail to write HDFS with custom codec when using SparkRDMA #33

Closed tobegit3hub closed 5 years ago

tobegit3hub commented 5 years ago

We have used Spark RDMA shuffle manager to replace the default shuffle manager. But it throw MetadataFetchFailedException. Here is the error log.

19/05/22 10:23:31 INFO disni: createCompChannel, context 139795900960400
19/05/22 10:23:31 INFO disni: createCQ, objId 139794752991072, ncqe 1
19/05/22 10:23:31 INFO disni: createQP, objId 139796347838856, send_wr size 0, recv_wr_size 0
19/05/22 10:23:31 INFO disni: accept, id 0
19/05/22 10:23:31 INFO ProphetInstanceStatistics: create OutputStream for part 198, output file is :hdfs:///prophet/autoui/360leap-ptest/workspace/test/pws/1/FeatureExtract/455a1e51160e4798bf13b18f50ad7d09/36/c5bbce26-01b2-4731-af13-871bb635c751/0/1558491641346/signValueMap/part-198-_w0
19/05/22 10:23:31 ERROR FeSpark$$anon$1: Failed to process 198
19/05/22 10:23:31 ERROR FeSpark$$anon$1: processor failed
org.apache.spark.shuffle.MetadataFetchFailedException: RdmaShuffleNode: BlockManagerId(138, node3.leap.com, 36778, None) has no RDMA connection to BlockManagerId(36, node5.leap.com, 41465, None)
        at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:214)
        at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:203)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:203)
        at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:185)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
19/05/22 10:23:31 INFO RdmaNode: Established connection to /172.27.193.1:60672 in 983 ms
19/05/22 10:23:31 ERROR Executor: Exception in task 198.0 in stage 4.0 (TID 774)
java.lang.NullPointerException
        at org.apache.spark.rdd.RDD$$anonfun$map$1$$anonfun$apply$5.apply(RDD.scala:372)
        at org.apache.spark.rdd.RDD$$anonfun$map$1$$anonfun$apply$5.apply(RDD.scala:372)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
19/05/22 10:23:31 INFO CoarseGrainedExecutorBackend: Got assigned task 897
19/05/22 10:23:31 INFO Executor: Running task 321.0 in stage 4.0 (TID 897)
19/05/22 10:23:31 INFO Executor: Executor is trying to kill task 321.0 in stage 4.0 (TID 897), reason: Stage cancelled

It seems that the RDMA connection is not established but it will work with other SparkSQL jobs. We are not sure if it may be the bug of SparkRDMA.

tobegit3hub commented 5 years ago

Now we got the error when write data to HDFS and not sure if it is related to SparkRDMA. It will work with the default Spark shuffle manager.

19/05/23 11:31:54 WARN TaskSetManager: Lost task 88.0 in stage 4.0 (TID 1210, node8.leap.com, executor 65): org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException: reached end of stream after reading 9056 bytes; 892482608 bytes expected
    at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:735)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
    at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:125)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
    ... 8 more

And the RDMA will throw NPE because of the above error.

19/05/23 11:31:55 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
    at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160)
    at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:140)
    at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:655)
    at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:208)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:113)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)
19/05/23 11:31:55 INFO RdmaChannel: Stopping RdmaChannel RdmaChannel(9) 
19/05/23 11:31:55 INFO RdmaChannel: Stopping RdmaChannel RdmaChannel(66) 
19/05/23 11:31:55 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
    at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160)
    at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:140)
    at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:655)
    at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:208)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:113)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)
19/05/23 11:31:55 ERROR RdmaNode: java.util.concurrent.ExecutionException: java.lang.NullPointerException
19/05/23 11:31:55 ERROR RdmaNode: java.util.concurrent.ExecutionException: java.lang.NullPointerException
tobegit3hub commented 5 years ago

We found that this issue is only related to the Spark API of saveAsTextFile.

If we choose the official codec such as GzipCodec and Lz4Codec like rdd.saveAsTextFile(outputPath, classOf[GzipCodec]), it works like a charm. However, we got the above error when we use the custom codes like rdd.saveAsTextFile(outputPath, classOf[MyCustomCodec]).

So we are curious if the SparkRDMA only supports some official codec but not other custom codec. By the way, we have set spark.shuffle.spill.compress=false and spark.shuffle.compress=false to submit the Spark jobs.

tobegit3hub commented 5 years ago

It only happens at the last stage to save the files with our custom codec.

petro-rudenko commented 5 years ago

What's the custom codec do you use? Is it open source?

tobegit3hub commented 5 years ago

Thanks @petro-rudenko . It is not open source yet but I think we can wrap the open source implementation and test with it first.

tobegit3hub commented 5 years ago

Hi @petro-rudenko , we still got this error when using SparkRDMA.

The exception is from reading the shuffle data before using our custom codec. Here is the error log of https://issues.apache.org/jira/browse/SPARK-21172 . We have re-compile the Spark and print the row size from shuffle and got 0 for most row and the super large value for the last row which will throw the exception and fail the task https://github.com/apache/spark/blob/5264164a67df498b73facae207eda12ee133be7d/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala#L120 .

tobegit3hub commented 5 years ago

The first issue is not relevant to the last one. I'm gonna close this issue and open another one tor track that.