Mellanox / SparkRDMA

This is archive of SparkRDMA project. The new repository with RDMA shuffle acceleration for Apache Spark is here: https://github.com/Nvidia/sparkucx
Apache License 2.0
241 stars 70 forks source link

SPARK RDMA , HIBENCH not able to run. #20

Closed caiqingsong closed 5 years ago

caiqingsong commented 5 years ago

SPARK-2.2.0 Hadoop 2.7 HiBench face below issue, But SPARK DiNSI RDMA basic benchmark able to run.

Please take a look, thanks very much.

Stack: [0x00007fe4b42e3000,0x00007fe4b43e4000], sp=0x00007fe4b43e20c8, free space=1020k Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code) C [libc.so.6+0x147ce5] __memcpy_ssse3_back+0x45 C 0x00000000007e7ea8

Java frames: (J=compiled Java code, j=interpreted, Vv=VM code) j com.ibm.disni.rdma.verbs.impl.NativeDispatcher._connect(JJ)I+0 j com.ibm.disni.rdma.verbs.impl.RdmaCmNat.connect(Lcom/ibm/disni/rdma/verbs/RdmaCmId;Lcom/ibm/disni/rdma/verbs/RdmaConnParam;)I+45 j com.ibm.disni.rdma.verbs.RdmaCmId.connect(Lcom/ibm/disni/rdma/verbs/RdmaConnParam;)I+6 j org.apache.spark.shuffle.rdma.RdmaChannel.connect(Ljava/net/InetSocketAddress;)V+207 j org.apache.spark.shuffle.rdma.RdmaNode.getRdmaChannel(Ljava/net/InetSocketAddress;Z)Lorg/apache/spark/shuffle/rdma/RdmaChannel;+148 j org.apache.spark.shuffle.rdma.RdmaShuffleManager.getRdmaChannel(Ljava/lang/String;IZ)Lorg/apache/spark/shuffle/rdma/RdmaChannel;+20 j org.apache.spark.shuffle.rdma.RdmaShuffleManager.getRdmaChannel(Lorg/apache/spark/shuffle/rdma/RdmaShuffleManagerId;Z)Lorg/apache/spark/shuffle/rdma/RdmaChannel;+10 j org.apache.spark.shuffle.rdma.RdmaShuffleManager$$anon$1$$anonfun$onSuccess$2.apply()Lorg/apache/spark/shuffle/rdma/RdmaChannel;+15 j org.apache.spark.shuffle.rdma.RdmaShuffleManager$$anon$1$$anonfun$onSuccess$2.apply()Ljava/lang/Object;+1 j scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1()Lscala/util/Try;+8 j scala.concurrent.impl.Future$PromiseCompletingRunnable.run()V+5 j scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec()Z+4 j scala.concurrent.forkjoin.ForkJoinTask.doExec()I+10 j scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V+10 j scala.concurrent.forkjoin.ForkJoinPool.runWorker(Lscala/concurrent/forkjoin/ForkJoinPool$WorkQueue;)V+11 j scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V+14 v ~StubRoutines::call_stub

yuvaldeg commented 5 years ago

Thanks for reporting this issue, caiqingsong.

Can you please share some more information about your system and maybe share the executor logs as well? also some info on what test was run exactly and with what configuration. We will try to reproduce the issue internally at Mellanox. Also, have you built DiSNI on your own? if not, I suggest you try it in case the pre-built binary is not compatible with your OS. Can you please comment on what tests did work for you with SparkRDMA?

caiqingsong commented 5 years ago

Hi Yuvaldeg,

Thanks for the feedback.
For more detail log please reference the attachment.
Yes, I've rebuild my DiSNI on my system.
I can run below benchmark:
java -cp disni-1.7-jar-with-dependencies.jar:disni-1.7-tests.jar com.ibm.disni.benchmarks.ReadServer  -a 192.168.100.211  -s 64 -k 100000
java -cp disni-1.7-jar-with-dependencies.jar:disni-1.7-tests.jar com.ibm.disni.benchmarks.ReadClient  -a 192.168.100.211  -s 64 -k 100000

Btw, I'm using BRCM 25g roce NIC this time. Thanks a lot. hibench_error.log

bench.log

caiqingsong commented 5 years ago

Hi Yuvaldeg,

Additional information , I tried the SparkRDMA3.1 , this error was disappeared . but the new error:

18/12/20 18:47:00 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1.0 (TID 1002, SH-NIC-3, executor 2): java.lang.ClassNotFoundException: org.apache.spark.shuffle.rdma.RdmaSerializedShuffleHandle at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1867) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/12/20 18:47:00 WARN scheduler.TaskSetManager: Lost task 4.3 in stage 1.0 (TID 1009, SH-NIC-3, executor 2): TaskKilled (stage cancelled) 18/12/20 18:47:00 INFO ibm.disni: createCompChannel, context 139688665345080 18/12/20 18:47:00 INFO ibm.disni: createCQ, objId 139678980252992, ncqe 4352 18/12/20 18:47:00 INFO ibm.disni: createQP, objId 139678980255392, send_wr size 4096, recv_wr_size 256 Exception in thread "main" org.apache.spark.SparkException: Job aborted. at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:107) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1003) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:994) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:994) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:994) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:982) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$1.apply(PairRDDFunctions.scala:982) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$1.apply(PairRDDFunctions.scala:982) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:981) at com.intel.hibench.sparkbench.micro.ScalaTeraSort$.main(ScalaTeraSort.scala:60) at com.intel.hibench.sparkbench.micro.ScalaTeraSort.main(ScalaTeraSort.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 1008, SH-NIC-3, executor 2): java.lang.ClassNotFoundException: org.apache.spark.shuffle.rdma.RdmaSerializedShuffleHandle at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1867) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2075) at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:88) ... 32 more Caused by: java.lang.ClassNotFoundException: org.apache.spark.shuffle.rdma.RdmaSerializedShuffleHandle at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1867) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85) Thanks, Qingsong

petro-rudenko commented 5 years ago

Hi Qingsong, please make sure you are using jar-with-dependencies in your spark.executor.extraClassPath. Do you use jar from release page or build your own? Can you please share your Hibench configurations.

Thanks, Peter

caiqingsong commented 5 years ago

Hi Peter,

Thanks for your feedback. I'm build jar by my own , but not modify anything.

For the hibench configurations , please see the attachment. hibench.txt

BR, Qingsong

petro-rudenko commented 5 years ago

@caiqingsong thanks, can you also provide spark.conf you are using. Also make sure you build SparkRDMA for your spark version and using jar-with-dependencies fat jar.

caiqingsong commented 5 years ago

Hi Peter,

Thanks for your comments, with original jar, I can run HiBench over RDMA within two nodes. But the performance looks bad, first line over TCP, second line over RDMA,  any comments will be appreciate.

ScalaSparkTerasort 2018-12-27 19:26:22 189000000 1147.530 164701 82350
ScalaSparkTerasort 2018-12-27 19:52:16 189000000 1110.398 170209 85104

Btw, according to my understand, there are two concepts, one is spark over rdma, the other is hadoop over rdma. Is the performance related to hadoop over rdma?

Thanks again, Qingsong

caiqingsong commented 5 years ago

Hi Perter,

Regarding of the spark.conf, please see attachment. spark.txt

Thanks, Qingsong

petro-rudenko commented 5 years ago

@caiqingsong thanks,

  1. Hadoop over RDMA (hdfs over rdma) - is a project we're working on. This repository is for SparkRDMA - it doesn't depend on hadoop. Actually you can use it without hadoop - only with spark Standalone installation.
  2. To improve performance you need to increase the number of executors and number of cores per executor. You have only 2 executors (so only 50% of the data would be shuffled through the network).
caiqingsong commented 5 years ago

Hi Petro,

Thanks for your great support.

looks just increase below options not make difference , anything I've missed?

Best Regards, Qingsong

hibench.yarn.executor.num 16 hibench.yarn.executor.cores 16

caiqingsong commented 5 years ago

Hi Petro,

I saw MLNX community gave the below data, is this result run  hadoop over rdma or spark over rdma?

Type Date Time Input_data_size Duration(s) Throughput(bytes/s) Throughput/node ScalaSparkTerasort 2018-03-26 19:13:52 189000000000 79.931 2364539415 2364539415 ScalaSparkTerasort 2018-03-26 19:17:13 189000000000 52.166 3623049495 3623049495

Thanks very much, Qingsong

petro-rudenko commented 5 years ago

@caiqingsong the results are for SparkRDMA. How many physically nodes does your cluster has?

caiqingsong commented 5 years ago

Hi Perter, Only two nodes now.

BR, Qingsong

petro-rudenko commented 5 years ago

@caiqingsong in this case only 50% of your traffic goes through the network, other is just local traffic.

caiqingsong commented 5 years ago

Hi Perter,

so according to your point of view, with two nodes, my results was expected?

and how many nodes should I use for the basic test?

Best Regards, Qingsong

petro-rudenko commented 5 years ago

@caiqingsong in the result, we described on README page, we used 7 instances, with infiniband interconnect. You can start with 3-5 physical nodes, but try to run bigger workload size (e.g. bigdata).

caiqingsong commented 5 years ago

Hi Peter,

Thanks for your great support.
Closed.

Happy new year! Qingsong