Mellanox / SparkRDMA

This is archive of SparkRDMA project. The new repository with RDMA shuffle acceleration for Apache Spark is here: https://github.com/Nvidia/sparkucx
Apache License 2.0
240 stars 70 forks source link

SparkRDMA issue:ERROR scheduler.TaskSetManager: Task 45 in stage 1.0 failed 4 times; aborting job #17

Closed XianlaiShen closed 5 years ago

XianlaiShen commented 5 years ago

bench.log

Hi I attached the log for your reference. Please help me to check it, thansks! Because my servers are ARMv8 64bit server, so I have downloaded the MLNX_OFED_LINUX-4.4-2.0.7.0-rhel7.5alternate-aarch64.tgz and have compiled it and installed it, and used perftest to test it and make sure the physical RDMA is ok. I have setup two nodes for hadoop(hadoop-2.7.1) and spark(spark-2.2.0-bin-hadoop2.7 with standalone) on ARMv8 64bit server(Qualcomm ARM server), and used HiBench-7.0 terasort case to validate the SparkRDMA function. Firstly I run it with Spark on yarn mode(Dynamic Resource Allocation), but it failed. And I find you can run it successfully on spark with Standalone mode. So I change to spark Standalone mode. I building libdisni version 1.7.(git checkout tags/v1.7 -b v1.7). And configure SparkRDMA as below: spark.driver.extraClassPath /home/xianlai/SparkRDMA/SparkRDMA-3.0/target/spark-rdma-3.0-for-spark-2.2.0-jar-with-dependencies.jar spark.executor.extraClassPath /home/xianlai/SparkRDMA/SparkRDMA-3.0/target/spark-rdma-3.0-for-spark-2.2.0-jar-with-dependencies.jar spark.driver.extraJavaOptions -Djava.library.path=/usr/local/lib/ spark.executor.extraJavaOptions -Djava.library.path=/usr/local/lib/ spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager spark.shuffle.compress false spark.shuffle.spill.compress false spark.broadcast.compress false spark.broadcast.checksum false spark.locality.wait 0 The issue is: 18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 45.3 in stage 1.0 (TID 241) on 192.168.5.136, executor 3: java.lang.reflect.InvocationTargetException (null) [duplicate 175] 18/11/30 15:03:26 ERROR scheduler.TaskSetManager: Task 45 in stage 1.0 failed 4 times; aborting job 18/11/30 15:03:26 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1 18/11/30 15:03:26 INFO scheduler.TaskSchedulerImpl: Stage 1 was cancelled 18/11/30 15:03:26 INFO scheduler.DAGScheduler: ShuffleMapStage 1 (map at ScalaTeraSort.scala:49) failed in 5.189 s due to Job aborted due to stage failure: Task 45 in stage 1.0 failed 4 times, most recent failure: Lost task 45.3 in stage 1.0 (TID 241, 192.168.5.136, executor 3): java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:151) at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:110) at org.apache.spark.shuffle.rdma.RdmaMappedFile.(RdmaMappedFile.java:88) at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleData.writeIndexFileAndCommit(RdmaWrapperShuffleWriter.scala:65) at org.apache.spark.shuffle.rdma.RdmaShuffleBlockResolver.writeIndexFileAndCommit(RdmaShuffleBlockResolver.scala:64) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169) at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Invalid argument at sun.nio.ch.FileChannelImpl.map0(Native Method) ... 18 more

Driver stacktrace: 18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 19.3 in stage 1.0 (TID 242) on 192.168.5.136, executor 3: java.lang.reflect.InvocationTargetException (null) [duplicate 176] 18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 17.0 in stage 1.0 (TID 63) on 192.168.5.136, executor 2: java.lang.reflect.InvocationTargetException (null) [duplicate 177] 18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 8.2 in stage 1.0 (TID 236) on 192.168.5.136, executor 4: java.lang.reflect.InvocationTargetException (null) [duplicate 178] 18/11/30 15:03:26 INFO scheduler.DAGScheduler: Job 1 failed: runJob at SparkHadoopMapReduceWriter.scala:88, took 5.472082 s 18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 89.0 in stage 1.0 (TID 135) on 192.168.5.136, executor 2: java.lang.reflect.InvocationTargetException (null) [duplicate 179] 18/11/30 15:03:26 ERROR io.SparkHadoopMapReduceWriter: Aborting job job_20181130150320_0006. org.apache.spark.SparkException: Job aborted due to stage failure: Task 45 in stage 1.0 failed 4 times, most recent failure: Lost task 45.3 in stage 1.0 (TID 241, 192.168.5.136, executor 3): java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:151) at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:110) at org.apache.spark.shuffle.rdma.RdmaMappedFile.(RdmaMappedFile.java:88) at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleData.writeIndexFileAndCommit(RdmaWrapperShuffleWriter.scala:65) at org.apache.spark.shuffle.rdma.RdmaShuffleBlockResolver.writeIndexFileAndCommit(RdmaShuffleBlockResolver.scala:64) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169) at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Invalid argument at sun.nio.ch.FileChannelImpl.map0(Native Method) ... 18 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)

petro-rudenko commented 5 years ago

Hi, thanks for using SparkRDMA. For now, we've tested only on x86. What java version do you use? I'll check on Arm server, and let you know.

XianlaiShen commented 5 years ago

Hi Rudenko, Thanks for your quickly reply! I used java version is openjdk version "1.8.0_191"(java-1.8.0-openjdk-1.8.0.191.b12-0.el7_5.aarch64). And now I updated SparkRDMA version to 3.1,the issue still exist. Please help to check it on ARM server, thanks a lots to you!

Xianlai Shen Huaxintong Semiconductor Technology Co., Ltd. E-mail: xianlai.shen@hxt-semitech.com

XianlaiShen commented 5 years ago

Hi Rudenko, I used CentOS-7.5(4.14.62-5.hxt.aarch64) on ARM server. Thanks!

Xianlai Shen Huaxintong Semiconductor Technology Co., Ltd. E-mail: xianlai.shen@hxt-semitech.com

XianlaiShen commented 5 years ago

bench_20181204.log Hi Rudenko, Attached the issue log with SparkRDMA-3.1 for reference. Thanks!

Xianlai Shen Huaxintong Semiconductor Technology Co., Ltd. Shanghai, China. E-mail: xianlai.shen@hxt-semitech.com

petro-rudenko commented 5 years ago

Hi, can you please try the following program Main.java:

import sun.nio.ch.FileChannelImpl;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.channels.FileChannel;

public class Main {
    private static final Method mmap;
    private static final Method unmmap;

    static {
        try {
            mmap = FileChannelImpl.class.getDeclaredMethod("map0", int.class, long.class, long.class);
            mmap.setAccessible(true);
            unmmap = FileChannelImpl.class.getDeclaredMethod("unmap0", long.class, long.class);
            unmmap.setAccessible(true);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) throws IOException, InvocationTargetException, IllegalAccessException {
        final RandomAccessFile backingFile = new RandomAccessFile("testMmap", "rw");
        FileChannel fileChannel = backingFile.getChannel();
        long alignedLength = 339968l;

        long mapAddress = (long)mmap.invoke(fileChannel, 1, 0, alignedLength);
        System.out.println("Map adress = " + mapAddress);

        unmmap.invoke(null, mapAddress, alignedLength);
        fileChannel.close();
    }
}
$ head -c 10M </dev/urandom > testMmap
$ javac Main.java
$ java Main

I'm running Centos on qemu Arm7 emulation, and it works.

Also can you please specify which configuration for terasort you use (hibench.scale.profile, hibench.default.map.parallelism, hibench.default.shuffle.parallelism). Can you start with a hibench.scale.profile small to make sure it's working.

XianlaiShen commented 5 years ago

Main_java.log bench_20181205_small.log bench_20181205_tiny.log

Hi Peter, Thank a lots to you! I used your test program Main.java, and it print "Map adress = 281472493944832" after input "java Main". Attached the Main_java.log. And I used hibench terasort with hibench.scale.profile small(hibench.terasort.small.datasize 3200000) and hibench.scale.profile tiny(hibench.terasort.tiny.datasize 32000); there are also some ERROR information "ERROR rdma.RdmaNode: java.util.concurrent.ExecutionException: java.lang.NullPointerException", detail information as below, attached the bench_20181205_small.log and bench_20181205_tiny.log bench_20181205_tiny.log: 18/12/05 10:03:51 INFO rdma.RdmaChannel: Stopping RdmaChannel RdmaChannel(0) 18/12/05 10:03:51 INFO rdma.RdmaChannel: Stopping RdmaChannel RdmaChannel(5) 18/12/05 10:03:51 INFO rdma.RdmaChannel: Stopping RdmaChannel RdmaChannel(8) 18/12/05 10:03:51 INFO ibm.disni: disconnect, id 0 18/12/05 10:03:51 INFO ibm.disni: disconnect, id 0 18/12/05 10:03:51 INFO ibm.disni: disconnect, id 0 18/12/05 10:03:51 INFO ibm.disni: disconnect, id 0 18/12/05 10:03:51 INFO ibm.disni: disconnect, id 0 java.lang.NullPointerException at org.apache.spark.shuffle.rdma.RdmaChannel.processRdmaCmEvent(RdmaChannel.java:345) at org.apache.spark.shuffle.rdma.RdmaChannel.stop(RdmaChannel.java:894) at org.apache.spark.shuffle.rdma.RdmaNode.lambda$new$0(RdmaNode.java:203) at java.lang.Thread.run(Thread.java:748) Exception in thread "RdmaNode connection listening thread" java.lang.RuntimeException: Exception in RdmaNode listening thread java.lang.NullPointerException at org.apache.spark.shuffle.rdma.RdmaNode.lambda$new$0(RdmaNode.java:210) at java.lang.Thread.run(Thread.java:748) 18/12/05 10:03:51 INFO ibm.disni: disconnect, id 0 18/12/05 10:03:51 ERROR rdma.RdmaNode: java.util.concurrent.ExecutionException: java.lang.NullPointerException 18/12/05 10:03:51 ERROR rdma.RdmaNode: java.util.concurrent.ExecutionException: java.lang.NullPointerException 18/12/05 10:03:51 ERROR rdma.RdmaNode: java.util.concurrent.ExecutionException: java.lang.NullPointerException 18/12/05 10:03:51 INFO ibm.disni: disconnect, id 0 18/12/05 10:03:51 ERROR rdma.RdmaNode: java.util.concurrent.ExecutionException: java.lang.NullPointerException 18/12/05 10:03:51 INFO ibm.disni: disconnect, id 0 18/12/05 10:03:51 ERROR rdma.RdmaNode: java.util.concurrent.ExecutionException: java.lang.NullPointerException 18/12/05 10:03:51 ERROR rdma.RdmaNode: java.util.concurrent.ExecutionException: java.lang.NullPointerException 18/12/05 10:03:51 INFO ibm.disni: disconnect, id 0 18/12/05 10:03:51 ERROR rdma.RdmaNode: java.util.concurrent.ExecutionException: java.lang.NullPointerException 18/12/05 10:03:51 ERROR rdma.RdmaNode: java.util.concurrent.ExecutionException: java.lang.NullPointerException 18/12/05 10:03:51 ERROR rdma.RdmaNode: java.util.concurrent.ExecutionException: java.lang.NullPointerException 18/12/05 10:03:51 INFO rdma.RdmaBufferManager: Rdma buffers allocation statistics: 18/12/05 10:03:51 INFO rdma.RdmaBufferManager: Pre allocated 0, allocated 2562 buffers of size 4 KB 18/12/05 10:03:51 INFO ibm.disni: deallocPd, pd 1 18/12/05 10:03:51 INFO ibm.disni: destroyCmId, id 0 18/12/05 10:03:51 INFO ibm.disni: destroyEventChannel, channel 0 18/12/05 10:03:51 INFO memory.MemoryStore: MemoryStore cleared 18/12/05 10:03:51 INFO storage.BlockManager: BlockManager stopped 18/12/05 10:03:51 INFO storage.BlockManagerMaster: BlockManagerMaster stopped 18/12/05 10:03:51 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 18/12/05 10:03:51 INFO spark.SparkContext: Successfully stopped SparkContext 18/12/05 10:03:51 INFO util.ShutdownHookManager: Shutdown hook called 18/12/05 10:03:51 INFO util.ShutdownHookManager: Deleting directory /home/xianlai/hadoop_tmp1/local/spark-e6d43841-11d0-4f88-a874-26f848730c08

Xianlai Shen Huaxintong Semiconductor Technology Co., Ltd. Shanghai, China. E-mail: xianlai.shen@hxt-semitech.com

fengshuangeme commented 5 years ago

learn it

petro-rudenko commented 5 years ago

Hi, from your log i see job finished: 18/12/05 10:01:31 INFO scheduler.DAGScheduler: Job 1 finished: runJob at SparkHadoopMapReduceWriter.scala:88, took 4.270173 s. You should see job result in report/hibench.report.

The error you got is not fatal - the job is succeeded. Made a PR to fix it.

What configuration did you use when you got an initial error?

XianlaiShen commented 5 years ago

bench_terasort_large_error_20181206.log

configuration_files_and_error_log.zip

Hi Peter, Thanks very much! For hibench terasort tiny and small workloads, there are have results in report/hibench.report, and the running shell have prompt success, only NullPointerException in logs. It seems that SparkRDMA function is good on our ARM server(HXT-1) for terasort small workloads. That's great! For initially issue, I only modified the hibench.scale.profile to large(hibench.terasort.large.datasize 32000000) or huge(hibench.terasort.huge.datasize 320000000),and other configurations are the same as terasort small workloads. Now I attached the several key configuration files(configuration_files_and_error_log.zip) for the hibench.scale.profile to large: HiBench-7.0/conf/hibench.conf, HiBench-7.0/conf/Hadoop.conf, HiBench-7.0/conf/spark.conf, spark-2.2.0-bin-hadoop2.7/conf/spark-defaults.conf, spark-2.2.0-bin-hadoop2.7/conf/spark-env.sh I set spark.driver.extraJavaOptions and spark.executor.extraJavaOptions to -Djava.library.path=/home/xianlai/hadoop-2.7.1/lib/native/, becase I have copied the all libdisni from /usr/local/lib to /home/xianlai/hadoop-2.7.1/lib/native. If only modified the hibench.scale.profile to large, and other configurations are the same as terasort small workloads.The error log as below, detail information as attachment bench_terasort_large_error_20181206.log.

18/12/06 10:15:39 INFO scheduler.TaskSetManager: Lost task 13.3 in stage 1.0 (TID 171) on 192.168.5.135, executor 9: java.lang.reflect.InvocationTargetException (null) [duplicate 121] 18/12/06 10:15:39 ERROR scheduler.TaskSetManager: Task 13 in stage 1.0 failed 4 times; aborting job 18/12/06 10:15:39 INFO scheduler.TaskSetManager: Lost task 23.3 in stage 1.0 (TID 170) on 192.168.5.135, executor 9: java.lang.reflect.InvocationTargetException (null) [duplicate 122] 18/12/06 10:15:39 INFO scheduler.TaskSetManager: Lost task 43.2 in stage 1.0 (TID 169) on 192.168.5.135, executor 9: java.lang.reflect.InvocationTargetException (null) [duplicate 123] 18/12/06 10:15:39 INFO scheduler.TaskSetManager: Lost task 6.3 in stage 1.0 (TID 175) on 192.168.5.136, executor 2: java.lang.reflect.InvocationTargetException (null) [duplicate 124] 18/12/06 10:15:39 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1 18/12/06 10:15:39 INFO scheduler.TaskSchedulerImpl: Stage 1 was cancelled 18/12/06 10:15:39 INFO scheduler.DAGScheduler: ShuffleMapStage 1 (map at ScalaTeraSort.scala:49) failed in 6.458 s due to Job aborted due to stage failure: Task 13 in stage 1.0 failed 4 times, most recent failure: Lost task 13.3 in stage 1.0 (TID 171, 192.168.5.135, executor 9): java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:150) at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:109) at org.apache.spark.shuffle.rdma.RdmaMappedFile.(RdmaMappedFile.java:87) at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleData.writeIndexFileAndCommit(RdmaWrapperShuffleWriter.scala:65) at org.apache.spark.shuffle.rdma.RdmaShuffleBlockResolver.writeIndexFileAndCommit(RdmaShuffleBlockResolver.scala:64) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169) at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Invalid argument at sun.nio.ch.FileChannelImpl.map0(Native Method) ... 18 more

Driver stacktrace: 18/12/06 10:15:39 INFO scheduler.DAGScheduler: Job 1 failed: runJob at SparkHadoopMapReduceWriter.scala:88, took 6.845763 s 18/12/06 10:15:39 ERROR io.SparkHadoopMapReduceWriter: Aborting job job_20181206101532_0006.

Xianlai Shen Huaxintong Semiconductor Technology Co., Ltd. Shanghai, China. E-mail: xianlai.shen@hxt-semitech.com

petro-rudenko commented 5 years ago

Marking as fixed, PR #19 merged. Let's continue in mailing discussion for further issues.