uber / RemoteShuffleService

Remote shuffle service for Apache Spark to store shuffle data on remote servers.
Other
321 stars 100 forks source link

Disk damage causes failure #89

Open Lobo2008 opened 1 year ago

Lobo2008 commented 1 year ago

Hi,RSS_1 runs with -rootDir /data_01/rss-data/ ... RSS_iruns with -rootDir /data_i/rss-data/ . When a disk eg./data_03 is damaged, Spark app failed.

The code shows that spark.shuffle.rss.replicas (default=1) is used to avoid issues like this.But why the app still failed

Is decreasing spark.shuffle.rss.serverRatio (default=20) and increasing spark.shuffle.rss.replicas(eg. 2 ) will work? Or what should I do when one or some of these disks damage ?

# exception
com.uber.rss.exceptions.RssNetworkException: writeRowGroup: hit exception writing heading bytes 104, DataBlockSyncWriteClient 32 [/xxxx:50682 -> rss03.xxxx.net/xxxx:12222 (rss03.xxxx.net)], SocketException (Connection reset)
    at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:133)
    at com.uber.rss.clients.PlainShuffleDataSyncWriteClient.writeDataBlock(PlainShuffleDataSyncWriteClient.java:40)
    at 
 // code of the configs
  val replicas: ConfigEntry[Int] =
    ConfigBuilder("spark.shuffle.rss.replicas")
      .doc("number of replicas for replicated shuffle client.")
      .intConf
      .createWithDefault(1)
  val serverRatio: ConfigEntry[Int] =
    ConfigBuilder("spark.shuffle.rss.serverRatio")
      .doc("how many executors mapping to one shuffle server.")
      .intConf
      .createWithDefault(20)
hiboyang commented 1 year ago

Would you try spark.shuffle.rss.replicas=2?

Lobo2008 commented 1 year ago

Would you try spark.shuffle.rss.replicas=2?

Thanks @hiboyang , it works ! Seems replicas=1 actually means the original data itself, no extra replication. replicas=2 is ok.

Lobo2008 commented 1 year ago

Hi @hiboyang , I set replicas=2 but another exception is thrown:

When mapper-A sends data to StreamServer5 and a replication to StreamServer3

image

hiboyang commented 1 year ago

Hi @Lobo2008, this is good testing! Looks like RSS needs to support this scenario where rootDir becomes unavailable. RSS client should mark that server as failed in that case and switch to another server if spark.shuffle.rss.replicas larger than 1. This should be a TODO work for RSS.

hiboyang commented 1 year ago

By the way @Lobo2008 , want to double check, would you expand +details for the first block of exceptions to see whether there is more clue?

Screen Shot 2022-11-03 at 10 53 00 AM
Lobo2008 commented 1 year ago

Hi @hiboyang I post 2 apps for different exceptions but both failed for StreamServer5

application-1 failed on job10-Stage14, 26 tasks ( including retry attempts) failed for the same reason, the only difference is hit exception writing heading|data bytes xxx

# hit exception writing heading bytes xxx
com.uber.rss.exceptions.RssNetworkException: writeRowGroup: hit exception writing heading bytes 124, DataBlockSyncWriteClient 106 [/10.203.89.173:46914 -> rss05.MY_IP.net/10.203.54.83:12202 (rss05.MY_IP.net)], SocketException (Broken pipe)
    at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:133)
    at com.uber.rss.clients.PlainShuffleDataSyncWriteClient.writeDataBlock(PlainShuffleDataSyncWriteClient.java:40)
    at com.uber.rss.clients.ServerIdAwareSyncWriteClient.writeDataBlock(ServerIdAwareSyncWriteClient.java:73)
    at com.uber.rss.clients.ReplicatedWriteClient.lambda$writeDataBlock$2(ReplicatedWriteClient.java:82)
    at com.uber.rss.clients.ReplicatedWriteClient.runAllActiveClients(ReplicatedWriteClient.java:154)
    at com.uber.rss.clients.ReplicatedWriteClient.writeDataBlock(ReplicatedWriteClient.java:78)
    at com.uber.rss.clients.MultiServerSyncWriteClient.writeDataBlock(MultiServerSyncWriteClient.java:124)
    at com.uber.rss.clients.LazyWriteClient.writeDataBlock(LazyWriteClient.java:99)
    at org.apache.spark.shuffle.RssShuffleWriter$$anonfun$sendDataBlocks$1.apply(RssShuffleWriter.scala:166)
    at org.apache.spark.shuffle.RssShuffleWriter$$anonfun$sendDataBlocks$1.apply(RssShuffleWriter.scala:161)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.shuffle.RssShuffleWriter.sendDataBlocks(RssShuffleWriter.scala:161)
    at org.apache.spark.shuffle.RssShuffleWriter.write(RssShuffleWriter.scala:119)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:415)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1403)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:421)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Broken pipe
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
    at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:131)
    ... 22 more

# hit exception writing data yyy
com.uber.rss.exceptions.RssNetworkException: writeRowGroup: hit exception writing data 279, DataBlockSyncWriteClient 279 [/10.203.70.110:53497 -> rss05.MY_IP.net/10.203.54.83:12202 (rss05.MY_IP.net)], SocketException (Broken pipe)
    at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:141)
    at com.uber.rss.clients.PlainShuffleDataSyncWriteClient.writeDataBlock(PlainShuffleDataSyncWriteClient.java:40)
    at com.uber.rss.clients.ServerIdAwareSyncWriteClient.writeDataBlock(ServerIdAwareSyncWriteClient.java:73)
    at com.uber.rss.clients.ReplicatedWriteClient.lambda$writeDataBlock$2(ReplicatedWriteClient.java:82)
    at com.uber.rss.clients.ReplicatedWriteClient.runAllActiveClients(ReplicatedWriteClient.java:154)
    at com.uber.rss.clients.ReplicatedWriteClient.writeDataBlock(ReplicatedWriteClient.java:78)
    at com.uber.rss.clients.MultiServerSyncWriteClient.writeDataBlock(MultiServerSyncWriteClient.java:124)
    at com.uber.rss.clients.LazyWriteClient.writeDataBlock(LazyWriteClient.java:99)
    at org.apache.spark.shuffle.RssShuffleWriter$$anonfun$sendDataBlocks$1.apply(RssShuffleWriter.scala:166)
    at org.apache.spark.shuffle.RssShuffleWriter$$anonfun$sendDataBlocks$1.apply(RssShuffleWriter.scala:161)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.shuffle.RssShuffleWriter.sendDataBlocks(RssShuffleWriter.scala:161)
    at org.apache.spark.shuffle.RssShuffleWriter.write(RssShuffleWriter.scala:119)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:415)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1403)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:421)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Broken pipe
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
    at com.uber.rss.util.ByteBufUtils.readBytesToStream(ByteBufUtils.java:73)
    at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:139)
    ... 22 more

application-2 failed on job3-stage4, 1 task for 4 attempts :

# one of the four
com.uber.rss.exceptions.RssFinishUploadException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
    at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
    at com.uber.rss.clients.MultiServerSyncWriteClient.finishUpload(MultiServerSyncWriteClient.java:131)
    at com.uber.rss.clients.LazyWriteClient.finishUpload(LazyWriteClient.java:116)
    at org.apache.spark.shuffle.RssShuffleWriter.write(RssShuffleWriter.scala:123)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:415)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1403)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:421)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.uber.rss.exceptions.RssFinishUploadException: Failed to finish upload to server 15, DataBlockSyncWriteClient 144 [/10.203.35.101:16558 -> rss05.MY_IP.net/10.203.54.83:12202 (rss05.MY_IP.net)], RssNetworkException (writeMessageLengthAndContent failed: DataBlockSyncWriteClient 144 [/10.203.35.101:16558 -> rss05.MY_IP.net/10.203.54.83:12202 (rss05.MY_IP.net.net)], SocketException (Broken pipe)). If the network is good, this error may indicate your shuffle data exceeds the server side limit. This shuffle client has written 77 bytes.
    at com.uber.rss.clients.DataBlockSyncWriteClient.finishUpload(DataBlockSyncWriteClient.java:165)
    at com.uber.rss.clients.ShuffleDataSyncWriteClientBase.finishUpload(ShuffleDataSyncWriteClientBase.java:89)
    at com.uber.rss.clients.ServerIdAwareSyncWriteClient.finishUpload(ServerIdAwareSyncWriteClient.java:78)
    at com.uber.rss.clients.ReplicatedWriteClient.lambda$finishUpload$3(ReplicatedWriteClient.java:88)
    at com.uber.rss.clients.ReplicatedWriteClient.runAllActiveClients(ReplicatedWriteClient.java:154)
    at com.uber.rss.clients.ReplicatedWriteClient.finishUpload(ReplicatedWriteClient.java:88)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: com.uber.rss.exceptions.RssNetworkException: writeMessageLengthAndContent failed: DataBlockSyncWriteClient 144 [/10.203.35.101:16558 -> rss05.MY_IP.net/10.203.54.83:12202 (rss05.MY_IP.net)], SocketException (Broken pipe)
    at com.uber.rss.clients.ClientBase.writeMessageLengthAndContent(ClientBase.java:234)
    at com.uber.rss.clients.ClientBase.writeControlMessageNotWaitResponseStatus(ClientBase.java:248)
    at com.uber.rss.clients.ClientBase.writeControlMessageAndWaitResponseStatus(ClientBase.java:252)
    at com.uber.rss.clients.DataBlockSyncWriteClient.finishUpload(DataBlockSyncWriteClient.java:159)
    ... 14 more
Caused by: java.net.SocketException: Broken pipe
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
    at com.uber.rss.clients.ClientBase.writeMessageLengthAndContent(ClientBase.java:228)
    ... 17 more
hiboyang commented 1 year ago

Thanks @Lobo2008 for the debugging info! I checked the source code again. The code in RSS is supposed to try another server if hitting error with one server including disk write failure. Seems that part is not working as expected. Let's see whether Uber folks have environment to test and debug this.

Lobo2008 commented 1 year ago

Thanks @Lobo2008 for the debugging info! I checked the source code again. The code in RSS is supposed to try another server if hitting error with one server including disk write failure. Seems that part is not working as expected. Let's see whether Uber folks have environment to test and debug this.

Thanks @hiboyang , one more question:

We have 55 StreamServers, and setspark.shuffle.rss.serverRatio=5. if application-i uses 300 executors, which meansselectedServerCount=300/5=60.

Since 60 > 55, this app will select all the 55 StreamServers, thus no matter how Spark itself retries the job、stage、task, this app is destined to be failed if one disk crashes. Is this right?

hiboyang commented 1 year ago

Yes, if that replicas setting not work for you.

Another option: you could use spark.shuffle.rss.excludeHosts setting to exclude that server with bad disk.

Lobo2008 commented 1 year ago

Yes, if that replicas setting not work for you.

Another option: you could use spark.shuffle.rss.excludeHosts setting to exclude that server with bad disk.

Thanks but we may not need this setting, gonna run a monitor service that will kill the rss if disk crashes for now.