apache-spark-on-k8s / spark

Apache Spark enhanced with native Kubernetes scheduler back-end: NOTE this repository is being ARCHIVED as all new development for the kubernetes scheduler back-end is now on https://github.com/apache/spark/
https://spark.apache.org/
Apache License 2.0
612 stars 118 forks source link

Discuss HDFS data locality in Kubernetes #206

Open kimoonkim opened 7 years ago

kimoonkim commented 7 years ago

A few weeks ago, I prototyped a way to put HDFS namenode and datanodes in our kubernetes cluster. (See PR on apache-spark-on-k8s/Kubernetes-HDFS)

I am hoping the setup can help us find out how exactly HDFS data locality is broken in k8s. I wrote a Google doc with the research, http://tiny.pepperdata.com/k8s-hdfs-locality. PTAL. I am posting the summary below as well for quick read. The doc has diagrams and more details. Please share your comments in the Google doc since it's much easier to have in-line discussions in the doc.


HDFS locality, broken in Kubernetes

HDFS data locality relies on matching executor host names against datanode host names:

  1. Node locality: If the host name of an executor matches that of a given datanode, it means the executor can read the data from the local datanode.
  2. Rack locality: The namenode has topology information which is a list of host names with rack names as satellite values. If the namenode can retrieve an entry in its topology information using the executor host name as the key, then we can determine the rack that the executor resides. This means the rack locality will work. I.e. The executor can read data from datanodes in the same rack.

The node locality (A) is broken In Kubernetes. Each executor pod is assigned a virtual IP and thus virtual host name. This will not match datanodes’ physical host names.

Similarly, rack locality (B) is broken. The executor host name will fail to match any topology entry in the namenode, so we’ll fail to determine the rack name.

Locality-aware layers

Here, we look at existing locality-aware layers and figure out how to fix the locality.

When Spark reads data from HDFS, it can increase the read throughput by sending tasks to the executors that can access the needed disk data on the same node or another node on the same rack. This locality-aware execution is implemented in three different layers:

  1. Executor scheduling: When Spark Driver launches executors, it may suggest the cluster scheduler to consider a list of candidate hosts and racks that it prefers. The Driver gets this list by asking namenode which datanode hosts have the input data of the application. (In YARN, the optimization in this layer is triggered only when Spark dynamic allocation is enabled). See Appendix A for the detailed code snippets.
  2. Task dispatching: Once it gets executors, Spark Driver will dispatch tasks to executors. For each task, the Driver tries to send it to one of right executors that has the partition of the task on local disk or local rack. For this, the Driver builds the hosts-to- tasks and racks-to-tasks mapping with the datanode info from the namenode, and later looks up the maps using executor host names. See Appendix B for details.
  3. Reading HDFS partitions: When a task actually runs on an executor, it will read its partition block from a datanode host. The HDFS read library asks the namenode to return multiple candidate datanode hosts each of which has a copy of the data. The namenode sorts the result in the order of the proximity to the client host. The client picks the first one in the returned list. See Appendix C for details.

Layer (1) is not necessarily broken for k8s because we can probably use the k8s node selection to express the preference. However the correct implementation only exists in YARN-related code. We’ll need to generalize and reuse the code in the right way for k8s.

Layer (2) is broken. Spark Driver will build the mapping using the data node names. Then it will look up the maps using the executor pod host names, which will never match.

Layer (3) is also broken. The namenode will retrieve the executor pod name for the client host name. And compare the pod name against datanode host names to sort the datanode list. The resulting list will not be in the correct order.

kimoonkim commented 7 years ago

Thanks for the discussion today. I have updated the "How to fix" section based on the discussion. PTAL.

kimoonkim commented 7 years ago

In the SIG meeting, we discussed about the namenode topology plugin. I found a markdown file, RackAwareness.md, explaining how it works. The config key is net.topology.node.switch.mapping.impl and the Java interface for plugins is org.apache.hadoop.net.DNSToSwitchMapping. Please see the markdown file for more details.

kimoonkim commented 7 years ago

Double posting of kubernetes-HDFS issue 5 for people following only here.


As a follow-up, I have applied our HDFS to a K8s cluster in Google Cloud Engine. Please see this doc describing the experience, http://tiny.pepperdata.com/hdfs-k8s-gke, for details. The high level summary is:

kimoonkim commented 7 years ago

We are conducting experiments for HDFS in K8s. The first preliminary result is available. See http://tiny.pepperdata.com/hdfs-k8s-experiment. Quoting the overview here for a quick read, but please leave comments/questions in the doc for easier inline discussions:


We ran Spark TeraGen and TeraSort jobs in a K8s cluster with an HDFS running in it. The test subject is HDFS with the locality optimization that has the Spark driver fix and the namenode topology plugin. The baseline is a barebone HDFS without the locality optimization.

We report the preliminary result here. We saw the locality optimization removes a significant amount of unnecessary network use.

Caveat: The Google Cloud network has a very high network bandwidth-to-disk bandwidth ratio. In this setup, the locality optimization does not translate into shorter job running time. As such, there is no meaningful practical benefit from the locality optimization in the Google Cloud network. However, on other networks where the network-to-disk bandwidth ratio is lower (e.g. typical on-premise networks with host-local storage) the locality optimization has proven critical. This initial experiment was performed on Google Cloud because of the ease of setup, but future experiments will be performed on other environments. We expect these future experiments will show reduction in job running time.

kimoonkim commented 7 years ago

Doing this experiment allowed me to learn a few things. These are independent of GKE, so it's good that we learned these earlier on.

  1. The namenode topology plugin approach does not work with the default namenode config. Because the default locality sorting code inside the namenode does not handle a client IP properly if it does not match any datanode IPs. It won't even consider the client's network path that our plugin returns. It turned out there is alternative sorting code that is also pluggable, called org.apache.hadoop.net.NetworkTopologyWithNodeGroup. This has a more generalized mechanism of sorting. I had to configure this and a few other related options. I'll make sure to document this in the upcoming topology code and http://tiny.pepperdata.com/k8s-hdfs-locality. We were blind to this in part because the analysis in the google doc only looked at the namenode trunk code whereas Hadoop 2.7 we are using has very different code path.
  2. The existing topology plugin prototype retrieves the pod-IP to node mapping from GKE routes. It turned out the same information is available in Kubernetes node object as pod CIDR. I think we can easily switch the code to use this pod CIDR. I am hoping this will make the code work for both GKE and AWS at least, if not more.
  3. The driver task dispatching code in #216 has a minor bug. In GKE, k8s node names are only short names and do not match FQDNs. Datanode host names are FQDNs and the current code tries to match them using short names. (This doesn't invalidate the experiment because happened to use only three nodes and HDFS creates three copies of data. Every node has local copy and so the task dispatching code does not matter). I think the fix is simply appending the domain name and look up again. I'll play with this in a follow-up PR.
kimoonkim commented 7 years ago

@foxish @kow3ns @lins05

I started playing with AWS EC2 for further HDFS experiments. FYI, I am using kops to create k8s clusters on EC2, which seems to work quite well.

But there are many choices we can make about the test setup. EC2 has many different instance types. Also, kops provides multiple network plugins ranging from kubenet, which is what GKE uses, to a few 3rd party overlay/non-overlay networks like weave and calico.

I think it's best for me to write an experiment proposal document first so that we can discuss how to go about these choices. What do you guys think?

kimoonkim commented 7 years ago

Wrote the EC2 experiment proposal doc. PTAL.

kimoonkim commented 7 years ago

Updated http://tiny.pepperdata.com/hdfs-k8s-experiment with EC2 result. In the EC2 setup, the network is much slower than the disk read/write speed. We ran TeraValidate twice, with or without the data locality optimization code (PR #216) in the Spark driver.

See the chart below. The first run is without data locality and the second with data locality. The one with data locality finished much faster. ~ 7 mins vs ~11 mins. This is because it has increased disk reads dramatically as all of them are local reads and not bounded by the slow network.

baseline-vs-locality-ec2-d2-xlarge-250gb-teravalidate

kimoonkim commented 7 years ago

Updated http://tiny.pepperdata.com/k8s-hdfs-locality with findings on which k8s network providers would need the namenode layer fix for data locality.

In summary, it is only Google Cloud GKE which uses the kubenet network:

We can say the role of the namenode topology plugin is not as important as we originally thought. I'll finalize the current prototype and send a PR to https://github.com/apache-spark-on-k8s/kubernetes-HDFS soon.

foxish commented 7 years ago

@kimoonkim, do you have benchmark data of a similar nature for GKE? Is there a doc containing that?

kimoonkim commented 7 years ago

@foxish

@kimoonkim, do you have benchmark data of a similar nature for GKE? Is there a doc containing that?

Yes, please see the GKE experiment section on http://tiny.pepperdata.com/hdfs-k8s-experiment. It's not using the same TeraValidate that EC2 used. So the result is not as easy to understand. Maybe I should run a new experiment using TeraValidate on GKE too.

ash211 commented 7 years ago

@kimoonkim with the latest PR merged (executor locality) what is left to do in this umbrella issue?

kimoonkim commented 7 years ago

@ash211 The only remaining item is rack locality support. I am writing a PR. After that, I think we can close this issue!

echarles commented 7 years ago

(PR already closed and merged - Is there a better place to discuss the following question?)

From "Spark/HDFS data locality optimization" [*], I understand that IP addresses of Datanodes pods have to be binded to the node IP address which makes me think that it should be managed by kubenet.

What if I want to run HDFS Datanodes and Spark executors all both managed by a CNI plugin (e.g. Calico) - Will the current pod-cidr topology be effective or do I have to configure in a way the CNI plugin?

[*] https://docs.google.com/document/d/1TAC6UQDS3M2sin2msFcZ9UBBQFyyz4jFKWw5BM54cQo/edit

kimoonkim commented 6 years ago

@echarles

Asking the questions here is fine. We have a sister repo, https://github.com/apache-spark-on-k8s/kubernetes-HDFS/, that could have been better for this particular question.

What if I want to run HDFS Datanodes and Spark executors all both managed by a CNI plugin (e.g. Calico) - Will the current pod-cidr topology be effective or do I have to configure in a way the CNI plugin?

For all k8s network plugins, the current k8s HDFS locality code in Spark driver will do the right thing. The driver will find the cluster nodes that executors are running on. Then, it can correctly associate the datanodes on the same nodes with those executors.

That covers the driver side. We should also consider the HDFS namenode side, however. Will pod packets from executors appear with node IPs to the namenode so that namenode can also associate them with datanodes? The answer is yes for most k8s network plugins as they conduct IP masquerading or Network Address Translation (NAT), when pod packets head outside the pod IP subnet. Calico can also do the NAT, when its nat-outgoing option is properly configured. We have documented this in details at https://github.com/apache-spark-on-k8s/kubernetes-HDFS/tree/master/topology. Here's the calico-related part. For more details, please see the doc:

Calico is a popular non-overlay network provider. It turns out Calico can be also configured to do NAT between pod subnet and node subnet thanks to the nat-outgoing option. The option can be easily turned on and is enabled by default.

echarles commented 6 years ago

Thx @kimoonkim. Further discussion on this are on apache-spark-on-k8s/kubernetes-HDFS#29 and apache-spark-on-k8s/kubernetes-HDFS#30.