uber / RemoteShuffleService

Remote shuffle service for Apache Spark to store shuffle data on remote servers.
Other
321 stars 100 forks source link

Can Rss have stage retry when one server is down? #93

Open YutingWang98 opened 1 year ago

YutingWang98 commented 1 year ago

Hi, I just found out my spark job got killed with this error:

Caused by: com.uber.rss.exceptions.RssException: Failed to get node data for zookeeper node: /spark_rss/{cluster}/default/nodes/{server_host_name}
      at com.uber.rss.metadata.ZooKeeperServiceRegistry.getServerInfo(ZooKeeperServiceRegistry.java:231)
      ...
      at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
  Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /spark_rss/{cluster}/default/nodes/{server_host_name}
      at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
      ...
      at com.uber.rss.metadata.ZooKeeperServiceRegistry.getServerInfo(ZooKeeperServiceRegistry.java:228)

I then checked the zookeeper and didn't see this {server_host_name} registered there. So I suspect that it was already removed from zk due to some internal issues with the node, but was picked up by Rss before this happend. When Rss tried to connect, it was no longer on zk, and caused 'NoNodeException'. It retried and failed for 4 times, so then killed the job.

If this was the reason, maybe Rss needs to allow the connection process to skip nodes that are no longer on zookeepr, and pick a current available one? Any thoughts would be appreciated, thanks!

hiboyang commented 1 year ago

You are right that the server should be down and was removed from ZooKeeper after losing heartbeat with ZooKeeper.

Current RSS implementation assigns a static list of servers in the beginning of the Spark application, and does not change server list during application running time. This helps to simplify the logic to track which partition is on which server. The downside is if one RSS server is down in the middle, the application will fail.

If you want to have redundancy, you could enable "spark.shuffle.rss.replicas" to have multiple replicas, so if one server is down for one shuffle partition, there will be still another server and the application will not fail.

YutingWang98 commented 1 year ago

Thank you for the suggestions @hiboyang ! Does this mean the shuffle data written to the server will be doubled if I set 'spark.shuffle.rss.replicas' to 2? If so, this will work with some small jobs, but may require more disk for some really large jobs.

Also, currently we want to use Rss on a cluster level, so using the default values of 1 to 50 wouldn't make sense for all the jobs. Do you have any advice on how to specifiy "spark.shuffle.rss.maxServerCount" and "spark.shuffle.rss.minServerCount" for different spark jobs? Thanks!

YutingWang98 commented 1 year ago

Hi, @hiboyang. If the 'spark.shuffle.rss.replicas' does write double size of data to server, we won't be able to use this to large jobs with 400+ TB shuffle data unfortunatly.

So do you think we can just skip connecting to the unreachable server and pick up the next available one if this happens in the write stage. And if a server is down during the read stage, can Rss do what spark external shuffle service does to trigger a stage retry to revover the data?

hiboyang commented 1 year ago

"spark.shuffle.rss.maxServerCount" and "spark.shuffle.rss.minServerCount"

Thank you for the suggestions @hiboyang ! Does this mean the shuffle data written to the server will be doubled if I set 'spark.shuffle.rss.replicas' to 2? If so, this will work with some small jobs, but may require more disk for some really large jobs.

Also, currently we want to use Rss on a cluster level, so using the default values of 1 to 50 wouldn't make sense for all the jobs. Do you have any advice on how to specifiy "spark.shuffle.rss.maxServerCount" and "spark.shuffle.rss.minServerCount" for different spark jobs? Thanks!

Values for "spark.shuffle.rss.maxServerCount" and "spark.shuffle.rss.minServerCount" are case by case depending on Spark job (e.g. how many mappers/reducers, how much shuffle data, etc.). Normally you could start with default values (or not set them), and then changes the values and see any impact on your jobs.

hiboyang commented 1 year ago

Hi, @hiboyang. If the 'spark.shuffle.rss.replicas' does write double size of data to server, we won't be able to use this to large jobs with 400+ TB shuffle data unfortunatly.

So do you think we can just skip connecting to the unreachable server and pick up the next available one if this happens in the write stage. And if a server is down during the read stage, can Rss do what spark external shuffle service does to trigger a stage retry to revover the data?

I see. This is the current limitation that RSS cannot handle server down during spark job running time, if not use 'spark.shuffle.rss.replicas'. Hope someone could contribute to this feature!

YutingWang98 commented 1 year ago

Thanks for the replay! Will see what I can do to improve this.

YutingWang98 commented 1 year ago

@hiboyang Hi! I attempted to contribute to adding stage retry, but there seems to be a difficulty due to the implementation of Rss. Wondering if I can have some insights from you.

The issue is the amount of tasks to retry. When there is a server down, and we want to do stage retry to recover the files on this server,

So, due to the different map-reduce structure in Ess, I suppose the only way to have the fault tolerance without using replicas is to retry the entire stage instead of a few tasks? Please correct if I understand this wrong, and do you think it is possible to have a better way to retry and recover the partition files? Thanks!

hiboyang commented 1 year ago

Hi @YutingWang98 , you are right! This is the limit in current RSS. Maybe we could brainstorm ideas sometime on how to improve this.

mayurdb commented 1 year ago

Hi @YutingWang98 , you are right. Given the push nature of the RSS, all the tasks in the stage have to retried to deal with a server going down. This is how we have solved it internally in Uber:

  1. Throw a fetch failed exception from the RSS client (from both map/reduce tasks) when connectivity to RSS server is lost
  2. Upon receiving a fetch fail from RSS clients, clear all map outputs of the tasks completed so far
  3. Re-trigger the shuffle planning on server: call the ShuffleManager.registerShuffle()
  4. Pick a new set of available RSS server to execute the particular shuffle
  5. Retry all the tasks from the stage

We had to add a patch in Spark code to handle 2 and 3. Also there are cases of laggard tasks from the previous stage finishing after the re-attempt of the stage has started that needed to be handled.

This has been working out quite well for us. @hiboyang @YutingWang98 I can put up a formal design doc as well where we can brainstorm more ideas. Let me know your thoughts

YutingWang98 commented 1 year ago

Hi @mayurdb, thank you for the reply, and sharing your implementation! I have a question here:

If the spark stages are cascading, then one stage may depend on the previous stage's shuffle output. In this case, the retry should be recursive, and almost like rerunning the whole job. So besides retrying tasks from the current stage (what you mentioned in point 5), do you need to retry all the previous stages as well?

Looking forward to reading more details on your design doc, and we can sure discuss more about this! Thank you.

hiboyang commented 1 year ago

@mayurdb, this is pretty cool, thanks for sharing how you did inside Uber to fix this issue! Is it possible to share the patch you did in Spark code?

mayurdb commented 1 year ago

@hiboyang @YutingWang98 I have create a pull request with these changes. Can you take a look once. https://github.com/uber/RemoteShuffleService/pull/97/files

YutingWang98 commented 1 year ago

@mayurdb Thank you for sharing it, will take a look!