apache / incubator-uniffle

Uniffle is a high performance, general purpose Remote Shuffle Service.
https://uniffle.apache.org/
Apache License 2.0
363 stars 140 forks source link

[Bug] Failed to read data using netty #1816

Open dingshun3016 opened 3 weeks ago

dingshun3016 commented 3 weeks ago

Code of Conduct

Search before asking

Describe the bug

When using Netty to read data, the server index file does not exist, and the client still reads the shuffle data.

Affects Version(s)

master

Uniffle Server Log Output

[2024-06-20 02:36:35,167] [INFO] [HadoopFlushEventThreadPool-46] [HadoopShuffleWriteHandler] [initialize] >>> User: u_yth, Path: hdfs://xxx/user/uniffle/gd17-02/shuffle_data/application_1718442507784_2497395_-779157490-31930/0/176-176
[2024-06-20 02:36:38,945] [DEBUG] [HadoopFlushEventThreadPool-46] [DefaultFlushEventHandler] [handleEventAndUpdateMetrics] >>> Flush event:ShuffleDataFlushEvent: eventId=8443984, appId=application_1718442507784_2497395_-779157490-31930, shuffleId=0, startPartition=176, endPartition=176, retryTimes=0, underStorage=HadoopStorage, isPended=false, ownedByHugePartition=false successfully in 3779 ms and release 1073814556 bytes
[2024-06-20 02:38:03,727] [INFO] [nioEventLoopGroup-3-260] [ShuffleServerNettyHandler] [operationComplete] >>> Successfully executed getMemoryShuffleData for appId[application_1718442507784_2497395_-779157490-31930], shuffleId[0], partitionId[176]. Took 15 ms and retrieved 12291274 bytes of data
[2024-06-20 02:38:03,877] [INFO] [nioEventLoopGroup-3-259] [ShuffleServerNettyHandler] [operationComplete] >>> Successfully executed getMemoryShuffleData for appId[application_1718442507784_2497395_-779157490-31930], shuffleId[0], partitionId[176]. Took 0 ms and retrieved 0 bytes of data
[2024-06-20 02:38:03,880] [WARN] [nioEventLoopGroup-3-259] [ShuffleServerNettyHandler] [handleGetLocalShuffleIndexRequest] >>> Index file for appId[application_1718442507784_2497395_-779157490-31930], shuffleId[0], partitionId[176] is not found, maybe the data has been flushed to cold storage.
org.apache.uniffle.common.exception.FileNotFoundException: No such data in current storage manager.
        at org.apache.uniffle.server.ShuffleTaskManager.getShuffleIndex(ShuffleTaskManager.java:841) ~[shuffle-server-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
        at org.apache.uniffle.server.netty.ShuffleServerNettyHandler.handleGetLocalShuffleIndexRequest(ShuffleServerNettyHandler.java:360) [shuffle-server-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
        at org.apache.uniffle.server.netty.ShuffleServerNettyHandler.receive(ShuffleServerNettyHandler.java:85) [shuffle-server-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
        at org.apache.uniffle.common.netty.handle.TransportRequestHandler.handle(TransportRequestHandler.java:62) [rss-common-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
        at org.apache.uniffle.common.netty.handle.TransportChannelHandler.channelRead(TransportChannelHandler.java:100) [rss-common-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) [netty-handler-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at org.apache.uniffle.common.netty.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:80) [rss-common-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [netty-common-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.109.Final.jar:4.1.109.Final]
        at java.base/java.lang.Thread.run(Thread.java:833) [?:?]
[2024-06-20 02:38:38,922] [INFO] [nioEventLoopGroup-3-265] [ShuffleServerNettyHandler] [operationComplete] >>> Successfully executed getMemoryShuffleData for appId[application_1718442507784_2497395_-779157490-31930], shuffleId[0], partitionId[176]. Took 25 ms and retrieved 12291274 bytes of data
[2024-06-20 02:38:39,267] [INFO] [nioEventLoopGroup-3-266] [ShuffleServerNettyHandler] [operationComplete] >>> Successfully executed getMemoryShuffleData for appId[application_1718442507784_2497395_-779157490-31930], shuffleId[0], partitionId[176]. Took 0 ms and retrieved 0 bytes of data
[2024-06-20 02:38:39,270] [WARN] [nioEventLoopGroup-3-266] [ShuffleServerNettyHandler] [handleGetLocalShuffleIndexRequest] >>> Index file for appId[application_1718442507784_2497395_-779157490-31930], shuffleId[0], partitionId[176] is not found, maybe the data has been flushed to cold storage.
org.apache.uniffle.common.exception.FileNotFoundException: No such data in current storage manager.
        at org.apache.uniffle.server.ShuffleTaskManager.getShuffleIndex(ShuffleTaskManager.java:841) ~[shuffle-server-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
        at org.apache.uniffle.server.netty.ShuffleServerNettyHandler.handleGetLocalShuffleIndexRequest(ShuffleServerNettyHandler.java:360) [shuffle-server-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
        at org.apache.uniffle.server.netty.ShuffleServerNettyHandler.receive(ShuffleServerNettyHandler.java:85) [shuffle-server-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
        at org.apache.uniffle.common.netty.handle.TransportRequestHandler.handle(TransportRequestHandler.java:62) [rss-common-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
        at org.apache.uniffle.common.netty.handle.TransportChannelHandler.channelRead(TransportChannelHandler.java:100) [rss-common-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) [netty-handler-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at org.apache.uniffle.common.netty.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:80) [rss-common-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) [netty-transport-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [netty-common-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.109.Final.jar:4.1.109.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.109.Final.jar:4.1.109.Final]
        at java.base/java.lang.Thread.run(Thread.java:833) [?:?]

Uniffle Engine Log Output

24/06/20 02:38:03 INFO [Executor task launch worker for task 9577] ShuffleServerGrpcNettyClient: GetInMemoryShuffleData from xxx:49999 for appId[application_1718442507784_2497395_-779157490-31930], shuffleId[0], partitionId[176], lastBlockId[1476402704] cost 1 ms
24/06/20 02:38:03 INFO [Executor task launch worker for task 9577] ShuffleServerGrpcNettyClient: GetShuffleIndex from xxx:49999 for appId[application_1718442507784_2497395_-779157490-31930], shuffleId[0], partitionId[176 cost 1 ms
24/06/20 02:38:03 INFO [Executor task launch worker for task 9577] RssShuffleManager: Mark the task: 9577_0 failed.

24/06/20 02:38:03 ERROR [Executor task launch worker for task 9577] Executor: Exception in task 176.0 in stage 14.0 (TID 9577)
org.apache.uniffle.common.exception.RssFetchFailedException: Failed to read shuffle data from WARM handler
    at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:124)
    at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:134)
    at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:308)
    at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:216)
    at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:115)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
    at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:312)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage13.sort_addToSorter_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage13.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:47)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
    at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:837)
    at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:794)
    at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:1033)
    at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:1069)
    at org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:47)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.agg_doAggregateWithKeys_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:47)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.shuffle.writer.RssShuffleWriter.writeImpl(RssShuffleWriter.java:291)
    at org.apache.spark.shuffle.writer.RssShuffleWriter.write(RssShuffleWriter.java:272)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:135)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$runWithUgi$3(Executor.scala:462)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1381)
    at org.apache.spark.executor.Executor$TaskRunner.runWithUgi(Executor.scala:465)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:394)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.uniffle.common.exception.RssException: Read index data under flow
    at org.apache.uniffle.common.segment.LocalOrderSegmentSplitter.split(LocalOrderSegmentSplitter.java:144)
    at org.apache.uniffle.storage.handler.impl.DataSkippableReadHandler.readShuffleData(DataSkippableReadHandler.java:80)
    at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:113)
    ... 40 more
Caused by: java.nio.BufferUnderflowException
    at java.nio.Buffer.nextGetIndex(Buffer.java:506)
    at java.nio.DirectByteBuffer.getLong(DirectByteBuffer.java:772)
    at org.apache.uniffle.common.segment.LocalOrderSegmentSplitter.split(LocalOrderSegmentSplitter.java:95)
    ... 42 more

Uniffle Server Configurations

No response

Uniffle Engine Configurations

No response

Additional context

No response

Are you willing to submit PR?

dingshun3016 commented 3 weeks ago

@rickyma Can you help me take a look?

rickyma commented 3 weeks ago

I've never encountered this. Do you have any idea? I think you might be familiar with this BufferUnderflowException scenario. @zuston