Mellanox / SparkRDMA

This is archive of SparkRDMA project. The new repository with RDMA shuffle acceleration for Apache Spark is here: https://github.com/Nvidia/sparkucx
Apache License 2.0
240 stars 70 forks source link

Steam is corrupted when shuffle read with RDMA Shuffle Manager #34

Open tobegit3hub opened 5 years ago

tobegit3hub commented 5 years ago

We have setup RDMA environment and run the Spark jobs with RDMA Shuffle Manager. Here is the Spark command to submit the job.

${SPARK_HOME}/bin/spark-submit \
        --executor-memory 7200m \
        --master yarn \
        --num-executors 40 \
        --files /root/spark-benchmark/tools/SparkRDMA/libdisni.so,/root/spark-benchmark/tools/SparkRDMA/spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar \
        --executor-cores 1 \
        --deploy-mode cluster \
        --driver-memory 4g \
        --conf spark.eventLog.dir=hdfs:///spark_benchmark/event_log/2.3.0 \
        --conf spark.yarn.jars=hdfs:///spark_benchmark/dependency/spark230_jars/*.jar \
        --conf spark.shuffle.spill.compress=false \
        --conf spark.executor.extraClassPath=spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar \
        --conf spark.shuffle.manager=org.apache.spark.shuffle.rdma.RdmaShuffleManager \
        --conf spark.driver.extraLibraryPath=./ \
        --conf spark.hadoop.yarn.timeline-service.enabled=false \
        --conf spark.eventLog.enabled=true \
        --conf spark.driver.extraClassPath=spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar \
        --conf spark.executor.extraLibraryPath=./ \
        --conf spark.executor.memoryOverhead=128g \
        --conf spark.shuffle.compress=true \
        --conf spark.sql.shuffle.partitions=340 \
        --conf spark.task.maxFailures=1 \
        --conf spark.yarn.maxAppAttempts=1 \
        --class com.example.SparkApp

This script may work at times. But sometimes it may cause the fail task although the task may re-schedule and complete finally. Here is the error log of the fail task. Since we have set spark.shuffle.compress=true and the default compressor Lz4Codec will throw exception of Stream is corrupted.

org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
    at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:202)
    at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
    at org.apache.spark.shuffle.rdma.RdmaShuffleReader$$anonfun$4.apply(RdmaShuffleReader.scala:68)
    at org.apache.spark.shuffle.rdma.RdmaShuffleReader$$anonfun$4.apply(RdmaShuffleReader.scala:64)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:124)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
    ... 8 more

If we set spark.shuffle.compress=false, the error will be throw by readFully when try to unserialized the steam to row object.

Job aborted due to stage failure: Task 3 in stage 4.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4.0 (TID 223, lnode6.leap.com, executor 1): org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException: reached end of stream after reading 723048 bytes; 825504310 bytes expected
    at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:735)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
    at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:125)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
    ... 8 more

The SparkApp is simple. We read the data from HDFS and do some transformation before calling saveAsTextFile to save the data in HDFS. The fail task always happen in the final stage.

By the way, this issue may be not easy to reproduce. But in our environment, we have use the custom codec to save the RDD which may raise the probability about this issue. The codec can not get the stream or break the stream before RDMA Shuffle Manager doing the shuffle read, so we still have no clue about the root cause of this issue.

petro-rudenko commented 5 years ago

This seems to be an issue with LZ4 codec: https://issues.apache.org/jira/browse/SPARK-18105

Can you please try to set ( lzf OR snappy OR zstd) to spark.io.compression.code

tobegit3hub commented 5 years ago

Thanks @petro-rudenko . We have ty lz4 codec in this scenario and it works with SparkRDMA.

tobegit3hub commented 5 years ago

By the way, we got another error after shuffle read when compression was disable. So it is definitely not the issue of the compression implementation.

petro-rudenko commented 5 years ago

What's the error you got?

tobegit3hub commented 5 years ago

@petro-rudenko The error message for disable compression was in the description of this issue. The third code snippet of https://github.com/Mellanox/SparkRDMA/issues/34#issue-465161133 .

tobegit3hub commented 5 years ago

We have communicated with the exports of FPGA and they said that the initialization of each FPGA process will request and reset some memory which may affect the data of shuffle read. We have seen that the data was corrupt with many bytes were 0 which looks like a fresh memory.

I'm not sure if it is related yet. But the RDMA Shuffle Manager may check or print the data of shuffle read. If it is not corrupted in the internal of RDMA Shuffle Manager but corrupted in the Spark executor's memory which may be initialized by FPGA code, it could be the issue of the client-size memory management.

We have seem the official Spark shuffle manager has the steam check for each shuffle, is it possible to add the similiar code in RDMA shuffle manager? @petro-rudenko

petro-rudenko commented 5 years ago

@tobegit3hub yes, i saw this feature. We're in the process of migrating SparkRDMA to more performant network backend based on ucx library. There we woudn't reimplement default spark iterator, rather all logic would be in the ShuffleClient. I'll check how easy would be to add corruption checker here. cc @yosefe

tobegit3hub commented 5 years ago

Thanks @petro-rudenko . That may help and we can test for the new client in our environment.