apache / celeborn

Apache Celeborn is an elastic and high-performance service for shuffle and spilled data.
https://celeborn.apache.org/
Apache License 2.0
886 stars 359 forks source link

[CELEBORN-1533] Log location when CelebornInputStream#fillBuffer fails #2655

Closed cxzl25 closed 3 months ago

cxzl25 commented 3 months ago

What changes were proposed in this pull request?

This PR aims to log location when CelebornInputStream#fillBuffer fails.

Why are the changes needed?

24/07/30 22:29:04,181 [Executor task launch worker for task 353.0 in stage 10.0 (TID 50641)] ERROR Executor: Exception in task 353.0 in stage 10.0 (TID 50641)
com.github.luben.zstd.ZstdException: Data corruption detected
    at com.github.luben.zstd.ZstdDecompressCtx.decompressByteArray(ZstdDecompressCtx.java:205)
    at com.github.luben.zstd.Zstd.decompressByteArray(Zstd.java:439)
    at org.apache.celeborn.client.compress.ZstdDecompressor.decompress(ZstdDecompressor.java:54)
    at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.fillBuffer(CelebornInputStream.java:563)
    at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.read(CelebornInputStream.java:464)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.sparkproject.guava.io.ByteStreams.read(ByteStreams.java:899)
    at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:733)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:496)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.sort_addToSorter_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:82)
    at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:1065)

Does this PR introduce any user-facing change?

No

How was this patch tested?

GA

turboFei commented 3 months ago

thanks, merged to main, branch-0.5, branch-0.4

turboFei commented 3 months ago

but It seems Celeborn should also support throw FetchFailedException when Data corruption detected

hi @cxzl25 would you like to follow up the comments for this?

cxzl25 commented 3 months ago

It seems Celeborn should also support throw FetchFailedException when Data corruption detected

Because in celeborn we are using ZstdDecompressCtx, decompression fails and it throws ZstdException, which is not inherited from IOException.

But in Spark, ZstdInputStreamNoFinalizer is used, which throws ZstdIOException and inherits IOException.

So this exception is not caught in CelebornInputStreamImpl#fillBuffer, we can do this next.