Closed Lobo2008 closed 2 years ago
Hi @Lobo2008, it is a little complicated. There are a lot of details regarding these options. If you do not use Dynamic Allocation, I would suggest setting spark.shuffle.service.enabled to false, since you have Remote Shuffle Service, and do not need the Spark's shuffle service.
Hi @Lobo2008, it is a little complicated. There are a lot of details regarding these options. If you do not use Dynamic Allocation, I would suggest setting spark.shuffle.service.enabled to false, since you have Remote Shuffle Service, and do not need the Spark's shuffle service.
Thank You!
No question about spark.shuffle.service now.But next step I will start to use dynamicAllocation.
Now I enable spark.dymamicAllocation.xxxx
, but still failed:
( spark.shuffle.service.enabled=true
now )
22/02/22 14:17:22 INFO ApplicationMaster: Started progress reporter thread with (heartbeat : 1000, initial allocation : 200) intervals
[Stage 0:> (0 + 0) / 2]
[Stage 0:> (0 + 1) / 2]
[Stage 0:> (0 + 2) / 2]22/02/22 14:18:19 ERROR YarnClusterScheduler: Lost executor 2 on 10.163.4.4: Unable to create executor due to Unable to register with external shuffle server due to : java.lang.UnsupportedOperationException: Unsupported shuffle manager of executor: ExecutorShuffleInfo{localDirs=[/home/yarn/nm-local-dir/usercache/xitong/appcache/application_1644546216413_0295/blockmgr-4e210579-6a16-4426-ab18-5ac9c556a4ce], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.RssShuffleManager}
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.registerExecutor(ExternalShuffleBlockResolver.java:149)
at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:113)
at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81)
at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.spark_project.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.spark_project.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.spark_project.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.spark_project.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at org.spark_project.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at org.spark_project.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.spark_project.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
22/02/22 14:18:20 ERROR YarnClusterScheduler: Lost executor 4 on 10.163.4.4: Unable to create executor due to Unable to register with external shuffle server due to : java.lang.UnsupportedOperationException: Unsupported shuffle manager of executor: ExecutorShuffleInfo{localDirs=[/home/yarn/nm-local-dir/usercache/xitong/appcache/application_1644546216413_0295/blockmgr-37e73108-5b6e-475d-a359-5cb8033a757e], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.RssShuffleManager}
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.registerExecutor(ExternalShuffleBlockResolver.java:149)
at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:113)
at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81)
at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.spark_project.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.spark_project.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.spark_project.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.spark_project.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at org.spark_project.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at org.spark_project.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.spark_project.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
+1 to this issue, I'm also wondering if RSS is compatible with spark.dynamicAllocation.enabled = true (and spark.shuffle.service.enabled = false)
+1 to this issue, I'm also wondering if RSS is compatible with spark.dynamicAllocation.enabled = true (and spark.shuffle.service.enabled = false)
spark requires spark.shuffle.service.enabled
to be true
when using dynamic allocation.
Yes, for Spark 2.4, Spark itself requires spark.shuffle.service.enabled
to be true
when using dynamic allocation. If you need dynamic allocation while using Remote Shuffle Service, you still need to have Spark's own External Shuffle Service.
It is different for Spark 3.x though. Spark 3.x has shuffle tracking and supports dynamic allocation without requiring spark.shuffle.service.enabled
to be true
. You could use dynamic allocation with Remote Shuffle Service, without Spark's own External Shuffle Service.
Does this mean that dynamic allocation + remote shuffle service is incompatible for spark 2.4? Because I don't think you can have spark.shuffle.service.enabled
to true
and spark.shuffle.manager
to org.apache.spark.shuffle.RssShuffleManager
?
Context : https://github.com/uber/RemoteShuffleService/issues/26
Also thank you so much for the responses!
There are a lot of details. Would you like to share more context, like what scenario or what you try to achieve by using Remote Shuffle Service? Then we could discuss more how Remote Shuffle Service could help you.
Hi @hiboyang sorry for the very late reply here. My company runs a lot of spark workloads on spark 2.4, but we are actively migrating to spark 3.2 -- so I'm hoping to use RemoteShuffleService for both. It looks like there's no backwards compatibility, but I was able to upgrade the spark 3.1 RemoteShuffleService to spark 3.2 and it seems to be working for basic spark apps at the moment. I'm wondering again about the dynamic allocation, however.
Do I need to set spark.dynamicAllocation.shuffleTracking.enabled
to true
? This seems to be the case from the spark docs (https://spark.apache.org/docs/latest/configuration.html) -- but it looks like under these circumstances, even if an executor is done working and has written its files to RemoteShuffleService, it would be kept alive.
For Spark 3.1 and 3.2, you could set spark.dynamicAllocation.shuffleTracking.enabled to true, and also set very small value for spark.dynamicAllocation.shuffleTracking.timeout (e.g. spark.dynamicAllocation.shuffleTracking.timeout=1). In this case, Spark driver will release executors very quickly.
Hi, I am running a WordCount on Spark 2.4.5. Uber RSS is built from the master branch and deployed with zookeeper mode referring the README 1) Spark is running on YARN/Hadoop some important configs that I didn't change in
spark-default.conf
2) YARN/Hadoop has two versions: 2.7.2 and 3.2.1,only 1 RM and 1 NodeManager running on different nodes in
yarn-site.xml
of both versions:Only 1 version of YARN/Hadoop is running each time,
spark-2.4.5-yarn-shuffle.jar
is in their classpaththen I change some configs in
spark-default.conf
1. YARN/Hadoop 2.7.2
spark.shuffle.service.enabled=false
, it works finebut when it is
true
:2. YARN/Hadoop 3.2.1
when
spark.shuffle.service.enabled=true
,when it is
false
: it throwscom.google.common.util.concurrent.MoreExecutors.sameThreadExecutor()Lcom/google/common/util/concurrent/ListeningExecutorService;
as aboveOR,
spark.shuffle.service.enabled
should always befalse
because UberRSS has done the job?