apache-spark-on-k8s / spark

Apache Spark enhanced with native Kubernetes scheduler back-end: NOTE this repository is being ARCHIVED as all new development for the kubernetes scheduler back-end is now on https://github.com/apache/spark/
https://spark.apache.org/
Apache License 2.0
612 stars 118 forks source link

Use the driver pod IP address for spark.driver.bindAddress #533

Closed liyinan926 closed 6 years ago

liyinan926 commented 6 years ago

What changes were proposed in this pull request?

This PR attempts to fix the issue reported in #523 that may happen if the driver tries to bind to the driver host name before the endpoint controller modifies the DNS configuration.

Changes: the submission client stops setting spark.driver.bindAddress based on the name of the headless service for the driver in DriverAddressConfigurationStep that's renamed to DriverServiceBootstrapStep in this PR. Instead, this PR introduces a new environment variable SPARK_DRIVER_BIND_ADDRESS that get its value from status.podIP using the downward API. So at runtime SPARK_DRIVER_BIND_ADDRESS's value is the driver pod's IP address. Then we can do -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS in the Dockerfile to give spark.driver.bindAddress the right value. The submission client still sets spark.driver.host to the driver service DNS name, though.

Tested on a GKE cluster using the SparkPi example. Verified that the following showed up in the driver container:

SPARK_DRIVER_BIND_ADDRESS=10.44.2.26

And the driver pod YAML contained the following:

- name: SPARK_DRIVER_BIND_ADDRESS
      valueFrom:
        fieldRef:
          apiVersion: v1
          fieldPath: status.podIP
- name: SPARK_JAVA_OPT_9
      value: -Dspark.driver.host=spark-pi-1508880029167-driver-svc.default.svc.cluster.local

@foxish @mccheah @kimoonkim

liyinan926 commented 6 years ago

rerun integration tests please

mccheah commented 6 years ago

Has this been tested in production clusters?

mccheah commented 6 years ago

Particularly with SPARK-21642 also merged.

mccheah commented 6 years ago

Actually I wanted to circle back to this and what we concluded from https://github.com/apache-spark-on-k8s/spark/issues/523.

It sounds like from the comments in the issue, the problem is a race condition when the Endpoints object for the driver's service is not created when the binding occurs.

I wonder if we can just fix the race condition itself. Can the scheduler backend's initialization have a Watch that blocks until the endpoint object is ready before attempting to bind to the given hostname? That seems more idiomatic and fixes the actual underlying problem, rather than working with IP address magic. But there's pros and cons either way.

If we use the IP address, is kube-dns a requirement for this code path? I think we still rely on kube-dns to resolve the K8s master hostname. But relying less on kube-dns seems beneficial anyways.

kimoonkim commented 6 years ago

@mccheah Excellent questions.

Can the scheduler backend's initialization have a Watch that blocks until the endpoint object is ready before attempting to bind to the given hostname?

This might also work. However, it could be difficult to block the binding until the endpoint is ready. Depends on which Spark core classes are in charge of binding the ports. (I think there are actually two, the scheduler backend port and block manager port) Personally, I do not know which core classes handle the binding.

If we use the IP address, is kube-dns a requirement for this code path?

The IP address approach allows the driver to avoid kube-dns dependency. But executors still use the service name to connect to the driver. So I think we still need kube-dns for this feature to work. Note it is still possible for executors to come up before the service endpoint is created. But this race condition is very very unlikely.

I think we still rely on kube-dns to resolve the K8s master hostname

For the k8s master hostname, we are relying on the underlying bare-metal DNS, not kube-dns. kube-dns is only in charge of service to pod mappings.

mccheah commented 6 years ago

it is still possible for executors to come up before the service endpoint is created. But this race condition is very very unlikely.

If we wanted, we could block this by having polling for the Endpoints resource's readiness in the scheduler backend - we know for sure that the scheduler needs to initialize before it requests for any executors, unlike with the general case where we don't know when the driver will attempt to bind to an address.

Having it bind to IP address instead seems fine, but I think we want to make sure this is tested with SPARK-21642 merged. If someone can custom build and test Spark with the appropriate patches then that would be fantastic. Something like what @ash211 did with https://github.com/apache-spark-on-k8s/spark/pull/484.

liyinan926 commented 6 years ago

I'm testing a custom build with changes from SPARK-21642. Will report back later.

liyinan926 commented 6 years ago

If we wanted, we could block this by having polling for the Endpoints resource's readiness in the scheduler backend - we know for sure that the scheduler needs to initialize before it requests for any executors, unlike with the general case where we don't know when the driver will attempt to bind to an address.

I agree the scheduler backend should block and wait for the Endpoints resource to become ready before launching executor pods.

mccheah commented 6 years ago

That would require a minor and very subtle API break. We allow the driver to be provided specific credentials so that the driver can have only the minimum privilege to create/delete pods. Now, the driver will also need privileges to read the status of endpoints in its namespace. I don't see this as a significant concern but I am noting it here as a reference.

liyinan926 commented 6 years ago

There seems to be an issue with changes from this PR and changes from SPARK-21642. I got the following exception when running the SparkPi example:

Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
    at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
    at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:32)
    at org.apache.spark.SparkEnv$.registerOrLookupEndpoint$1(SparkEnv.scala:300)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:314)
    at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:223)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:67)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:66)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    ... 4 more
Caused by: java.io.IOException: Failed to connect to spark-pi-1508813143731-driver:7078
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
    at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)

The problem is when the executor started up, CoarseGrainedExecutorBackend tried to create a SparkEnv and called RpcUtils.makeDriverRef somewhere. makeDriverRef constructs the driver endpoint using RpcAddress(driverHost, driverPort), where the driverHost gets its value from spark.driver.host. With the changes from SPARK-21642, spark.driver.host by default resolves to the hostname of the driver pod, which is by default the name of the pod, i.e., spark-pi-1508813143731-driver. That's why it failed with Failed to connect to spark-pi-1508813143731-driver:7078.

The hostname of the driver pod can be customized using the hostname field of the pod spec. We can definitely set it to the IP address of the driver pod in the scheduler backend, and by doing this spark.driver.host takes the driver pod IP address and the issue will be solved. But I'm not sure if this is defeating the purpose of SPARK-21642.

liyinan926 commented 6 years ago

Actually to solve the issue above, we just need to pass spark.driver.host that is set to the driver service DNS name to the executors. I will give it a try and report back.

liyinan926 commented 6 years ago

After failing to make the approach above work, this landed onto a totally different approach that finally works. Tested the new solution on a GKE cluster with changes from SPARK-21642 merged. Please see the updated PR description.

mccheah commented 6 years ago

I believe the main point of SPARK-21642 was to tie the driver to a hostname instead of an IP address. It seems like moving to set the host to an IP address is a regression in this behavior. Should we still be trying to make this work with the service hostname?

liyinan926 commented 6 years ago

We are still setting spark.driver.host to the driver service DNS name so the executors will still try to talk to the driver through the service. The only change is to set spark.driver.bindAddress to the actual pod IP address so it doesn't need a DNS resolution when the driver binds to that address.

kimoonkim commented 6 years ago

I believe the main point of SPARK-21642 was to tie the driver to a hostname instead of an IP address.

I was curious why it does that. It is helpful to consider the behavior in two pieces:

  1. Executors connect to the driver using a hostname instead of an IP address.
  2. The driver binds to the service ports using a hostname instead of an IP address.

Is only (1) the point of SPARK-21642? Or (2) is also important for SPARK-21642?

liyinan926 commented 6 years ago

@kimoonkim Very good points. IMO (1) is what SPARK-21642 really is all about. (2) is a side effect of it simply because spark.driver.bindAddress falls back to spark.driver.host if not set.

mccheah commented 6 years ago

Looks ok apart from the minor comments.