apache-spark-on-k8s / spark

Apache Spark enhanced with native Kubernetes scheduler back-end: NOTE this repository is being ARCHIVED as all new development for the kubernetes scheduler back-end is now on https://github.com/apache/spark/
https://spark.apache.org/
Apache License 2.0
612 stars 118 forks source link

In-cluster client mode #456

Open sahilprasad opened 7 years ago

sahilprasad commented 7 years ago

What changes were proposed in this pull request?

As per the discussion on my original PR (#402), this allows client mode if the submission environment is within a Kubernetes cluster. I erroneously stated in the above PR that the application resolves to the submission client at org.apache.spark.deploy.kubernetes.Client. However, this is not the case, and submitted applications resolve to the user-submitted class in the case of Java/Scala execution, and org.apache.spark.deploy.PythonRunner for Python execution. Since execution involves being inside a Kubernetes driver pod via kubectl run, I was able to get this to work after setting the spark.kubernetes.driver.pod.name to the HOSTNAME environment variable within the pod. Due to this configuration, once the KubernetesClusterSchedulerBackend class is invoked by the application class, the driver pod that the user submitted the application from is recognized as the driver of the application and execution proceeds as normal, with no extra driver pod being unnecessarily constructed.

The in-cluster use case is more of a stopgap for actual client mode support on this project, but is something that allows for client-mode applications like the PySpark and Scala shells, Jupyter, etc. The logic that checks whether the application was submitted from within a Kubernetes cluster just checks if KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT are provided.

Would definitely appreciate comments or questions, especially around how to better detect in-cluster execution!

How was this patch tested?

Ran provided unit and integration tests. Manual testing done through spark-submit, standalone PySpark scripts, PySpark on Jupyter, and both shells.

tristanz commented 7 years ago

Can we add basic usage example in the docs: https://github.com/apache-spark-on-k8s/spark/blob/branch-2.2-kubernetes/docs/running-on-kubernetes.md?

sahilprasad commented 7 years ago

@tristanz added docs. Did I leave anything out?

foxish commented 7 years ago

The docs are actually published separately out of https://github.com/apache-spark-on-k8s/userdocs I think this change is useful and the in-cluster client mode use-case is especially useful for Jupyter notebooks that are important for the interactive mode use-case.

@mccheah, I think creating a separate ClientSchedulerBackend should not be required here for this more limited pseudo-client mode.Thoughts?

erikerlandson commented 7 years ago

this needs to be rebased

erikerlandson commented 7 years ago

@mccheah @foxish my recollection from last meeting was desire to create ClientSchedulerBackend, was that precondition for this?

foxish commented 7 years ago

Method of finding if we're running in the cluster LGTM; there may be a better method using the fabric8 client, but that isn't the best fit for spark submit code. Thinking about this a bit more, the only hacky part here is having to manually set the driver pod name. An InClusterClientSchedulerBackend could do that for us, making the experience a bit cleaner.

foxish commented 7 years ago

An alternate method of finding if we're running in-cluster is to check for /var/run/secrets/kubernetes.io/serviceaccount and see if we have credentials there that can talk to the apiserver. This is common, but both env-vars and checking that path are heuristics and there can be situations where we can't tell apart in-cluster and out-of-cluster modes. It should cover most cases though.

sahilprasad commented 7 years ago

@foxish adding the extra check and including a lightweight InClusterSchedulerBackend (which should actually just be KubernetesClientSchedulerBackend, now that I think about it) sounds good to me. I'm thinking about maintaining the existing env variable check and incorporating the credentials check as part of the new schedule backend class.

An alternative might be to determine whether the pod network is routable (as per @tristanz's earlier comment). I'm not sure how best to do this, but it might cover more cases than the other option.

paulreimer commented 7 years ago

I'm not sure how to use this correctly, or it doesn't seem to be working for me.

I tried submitting jobs from the jupyter webUI, as well as exec within the jupyter pod.

spark-submit --deploy-mode client --master k8s://http://127.0.0.1:8001 --kubernetes-namespace spark --conf spark.kubernetes.driver.docker.image=<> --conf spark.kubernetes.executor.docker.image=<> --conf spark.kubernetes.initcontainer.docker.image=<>  --conf spark.executor.instances=3 --conf spark.app.name=spark-pi --conf=spark.kubernetes.driver.pod.name=$HOSTNAME gs://spark-resource-staging/pi.py 10

...

17/10/19 21:10:02 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
17/10/19 21:10:02 INFO DAGScheduler: Submitting 10 missing tasks from ResultStage 0 (PythonRDD[1] at reduce at /tmp/tmp8817217172572762368/pi.py:43) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
17/10/19 21:10:02 INFO KubernetesTaskSchedulerImpl: Adding task set 0.0 with 10 tasks
17/10/19 21:10:17 WARN KubernetesTaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

And it repeats that last message indefinitely. I also do not see any executor pods created, through either method. The same command works in cluster mode within the jupyter pod, just not client mode. My jupyter pod is in the default namespace and I would like the spark executors in the spark namespace.

sahilprasad commented 7 years ago

It's been a while since I've looked at this, but can you try enabling dynamic allocation when running in client mode?

paulreimer commented 7 years ago

Oh! I tried just now with everything in the default namespace and it worked.

paulreimer commented 7 years ago

It all works now, and is glorious.

Two important things I needed (my bad), I was using a kubectl-proxy sidecar container and using --master=k8s://http://127.0.0.1:8001, now I have no sidecar and I am using --master k8s://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT.

The second thing is that I thought/hoped those environment variables would be expanded, in a k8s Deployment args: ["$FOO", "$BAR"] but I suppose that is not the case or I'm getting the syntax wrong. With the exact same command baked into my jupyter docker image CMD, then it all worked great. I am able to run within the jupyter notebook in "client" mode and I do see executors launched and results coming back to jupyter. It's really cool, tbh.

paulreimer commented 7 years ago

I should note that dynamic allocation + shuffle service also works in this setup.

The only restriction I can see so far, is I had to locate all the resources within the same namespace as jupyter (I would have preferred the spark executors to be in a separate isolated namespace).

sahilprasad commented 7 years ago

@paulreimer Let me know if there are any documentation changes that can be made to make the process of starting up with "client" mode easier!

As for scheduling executors in a separate namespace than the Jupyter notebook, I am not sure how to go about resolving that. If you make any progress on this front, let me know!

mccheah commented 6 years ago

Was thinking about this a little more and it makes sense. But there's some more food for thought here.

I recently learned that TCP connections in Spark are only initiated from executor->executor and executor->driver. There is no connection from driver->executor. That means that theoretically, if the driver is reachable over a hostname or IP address from the executor pods, then client mode can be supported as long as the driver is not behind a firewall. So it may not be the case that the driver has to be running from within the context of a pod. In this case, the idea of client mode would be a lot closer to the notion of client mode in the other cluster managers.

We should adjust this accordingly so that:

And then we should try to test this on real clusters where the driver is just on a host that is in the cluster, not in a container.

mccheah commented 6 years ago

I also think that the cluster-mode scheduler backend has some conventions that are specific to cluster mode, such as the init container that downloads dependencies that the submission client sent over. A KubernetesClientSchedulerBackend should remove all components from the KubernetesClusterSchedulerBackend that only applied to cluster mode and that more importantly do not apply to the lack of a submission client deploying the application. Kubernetes credentials provision should also be reworked in client mode. The spark.kubernetes.authenticate.driver.mounted prefix doesn't make as much sense in client mode; it should just be a straight spark.kubernetes.authenticate.oauthToken without any prefix.

echarles commented 6 years ago

I have tried this branch running in client deploy mode from in_cluster pod (after fixing the small merge conflict) on a local k8s cluster created with kubeadm.

I have run interactive spark-shell, spark-submit and Zeppelin with 3 executors (will open a PR to document the needed configuration for Zeppelin).

It works globally well for what I have done (basic dataframe processing in scala, not additional external dependencies).

Three questions (mainly around pod naming):

  1. Executor Pod names have all the same pattern (spark-exec-1, spark-exec-2...) which does not follow the classical behavior (e.g spark-submit creates names like app spark-pi-1510391597235-exec-3)

This leads to "AlreadyExists" exception if you want to instantiate multiple Spark REPL on the same K8S cluster.

2017-11-11 09:57:27 ERROR KubernetesClusterSchedulerBackend:91 - Failed to allocate executor pod.
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: https://kubernetes.default.svc/api/v1/namespaces/default/pods. Message: pods "spark-exec-1" already exists. Received status: Status(apiVersion=v1, code=409, details=StatusDetails(causes=[], group=null, kind=pods, name=spark-exec-1, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=pods "spark-exec-1" already exists, metadata=ListMeta(resourceVersion=null, selfLink=null, additionalProperties={}), reason=AlreadyExists, status=Failure, additionalProperties={}).
  1. I receive a warning message when run in client mode with shuffle service enabled (no warning on my setup while running in cluster mode);
org.apache.spark.SparkException: Ambiguous specification of shuffle service pod. Found multiple matching pods: shuffle-n6qpv, 192.168.93.173 on datalayer-001
    at org.apache.spark.scheduler.cluster.k8s.KubernetesExternalShuffleManagerImpl.org$apache$spark$scheduler$cluster$k8s$KubernetesExternalShuffleManagerImpl$$addShufflePodToCache(KubernetesExternalShuffleManager.scala:115)
    at org.apache.spark.scheduler.cluster.k8s.KubernetesExternalShuffleManagerImpl$$anon$1.eventReceived(KubernetesExternalShuffleManager.scala:100)
    at org.apache.spark.scheduler.cluster.k8s.KubernetesExternalShuffleManagerImpl$$anon$1.eventReceived(KubernetesExternalShuffleManager.scala:94)

It is just a warning, and the REPL is then correctly created - Just mentioning as the behavior is different in cluster mode.

  1. I receive an error when looking at executor pod details from WEB Dashboard (Error_outline Internal Server Error (500) Unknown reference kind Pod - See screenshot) - This errors only shows for Pods created by Spark drive in in_cluster client mode. screenshot from 2017-11-11 10-11-24

screenshot from 2017-11-11 10-10-48

echarles commented 6 years ago

PS: We can forget my 3rd feedback (Error_outline Internal Server Error (500) - If I click via the page showing the list of pods, I receive the logs - If I click on the logs icon via the detail page, I receive the 500 error - I guess this is a dashboard or k8s issue.

erikerlandson commented 6 years ago

@echarles when you did the tests you described above, did you try non-pod clients, vis a vis @mccheah's comment about one-way executor-to-driver connections?

echarles commented 6 years ago

@erikerlandson @mccheah I have recompiled to relax the condition on the k8s client mode and made a few (unsuccessful) tests with submitting to a remote spark-k8s from my laptop (out of pod, spark-submit + spark-shell). I receive:

Exception in thread "main" org.apache.spark.SparkException: External scheduler cannot be instantiated
    at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2770)
(snip)
Caused by: java.io.FileNotFoundException: /var/run/secrets/kubernetes.io/serviceaccount/token (No such file or directory)
    at java.io.FileInputStream.open0(Native Method)

I have teste with/without spark.kubernetes.driver.pod.name property and with spark.dynamicAllocation.enabled=true/false, spark.shuffle.service.enabled=true/false

mccheah commented 6 years ago

Are we still continuing work on this? I still have some concerns - see https://github.com/apache-spark-on-k8s/spark/pull/456#issuecomment-343006176

tristanz commented 6 years ago

@mccheah I agree with your comment, it is a very good point. @sahilprasad can speak for himself, but I know he's back in school so may not have bandwidth to pick this up. Is anybody else interested?

foxish commented 6 years ago

I know @echarles was working on related things - would you be willing to take over?

echarles commented 6 years ago

Yep, this is listed in my todos, just after #463

sahilprasad commented 6 years ago

@tristanz @echarles Tristan is right — since I'm back in school, I don't have the time to take this on right now. All you, Eric! I'll definitely be keeping track of the progress of this feature.

echarles commented 6 years ago

@sahilprasad enjoy and success for your exams (if any? I think it is the period for these).

The good news is that I have been able to quickly hack the code to have a running spark k8s client out-of-cluster.

The bad news is that the executor initiates connection to the driver. When I run from my laptop to a cluster in the cloud, the executor pod is created but fails directly trying to connect to my local IP address (192.168.1.7).

2017-12-14 10:37:53 INFO  SecurityManager:54 - Changing modify acls groups to: 
2017-12-14 10:37:53 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1904)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
2017-12-14 10:39:53 ERROR RpcOutboxMessage:70 - Ask timeout before connecting successfully
Caused by: org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from 192.168.1.7:35789 in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
    at scala.util.Try$.apply(Try.scala:192)

I have then created a separated local cluster on my laptop and, from my IDE, I can run SparkPI and Spark Shell in k8s client mode.

I haven't tried on a separated machine, but if the cluster nodes have ip:port reachability, I guess it will be ok (need to test...)

As first iteration, I'd like to open a PR based on your feedbacks on those questions:

  1. I am used to run spark on yarn-client and in that case, the Yarn resource manager exposes the Application Master which proxies the Spark UI (so you browse e.g. http://<resource-manager>:8088/proxy/application_1513256521420_0002/jobs). With Yarn, the driver process remains on the local host, just like with what I have done. However, the spark UI (typically on port 4040) is not proxied and a user has to browse http://<localhost>:4040/application_1513256521420_0002/jobs). Not sure how mesos is behaving with that (I remember I had to create a ssh tunnel to browse the UI so I would say there is no proxy, but I don't have access anymore to mesos-based cluster).

  2. The needed changes are about creating the kubernetes client with the correct credentials, passing a null driverpod and disabling a few properties. So instead of creating a separated KubernetesClientSchedulerBackend, what about configuring and renaming the exisiting KubernetesClusterSchedulerBackend (let's say simply KubernetesSchedulerBackend)?

  3. Regarding the client in-cluster, the logic should reside in our KubernetesSchedulerBackend, so the kuberenetes client (if it supports it) could do the job. Or should we test for /var/run/secrets/kubernetes.io/serviceaccount? Any pointers and concrete methods welcome.

  4. For now I disable the oauth properties. @mccheah, can you bring some light about this? (I could point to a file, but I don't see any information about oauth in my local kubernetes configuration).

  5. The place to load the certificate and keys must also be defined. ~/.kube seems a reasonable choice to me to start with.

  6. The executor pod name must be updated (for now, it is always spark-executor-1...)

  7. Anything else I miss?

liyinan926 commented 6 years ago

The needed changes are about creating the kubernetes client with the correct credentials, passing a null driverpod and disabling a few properties. So instead of creating a separated KubernetesClientSchedulerBackend, what about configuring and renaming the exisiting KubernetesClusterSchedulerBackend (let's say simply KubernetesSchedulerBackend)?

What you mean by passing a null driver pod? KubernetesClusterSchedulerBackend currently gets the driver pod object from the API server and uses that to create OwnerReference for the executor pods. Do you mean ignoring this logic if the driver pod does not exist?

The place to load the certificate and keys must also be defined. ~/.kube seems a reasonable choice to me to start with.

Yes, the client can be pointed to .kube/config for configuration and use that to create a client to the API server.

There's one thing that's essential for in-cluster client mode, the driver headless service for the executors to connect to the driver. In current implementation, the submission client is solely responsible for creating the service. We need to think about who's in charge of creating the service in in-cluster client mode.

echarles commented 6 years ago

In case of client mode, there is no driver pod indeed, so yes, I simply skip the current logic (OwnerReference...).

I will default to ~/.kube/config to create get the credentials to create the k8s client.

For the in-cluster client mode, what do you mean with driver headless service? The in-cluster is nearly the same as the out-cluster client mode - The only difference I see is the way to get the needed credentials to create the k8s client (the in-cluster client mode does not have the ~/.kube/config).

liyinan926 commented 6 years ago

For the in-cluster client mode, what do you mean with driver headless service? The in-cluster is nearly the same as the out-cluster client mode - The only difference I see is the way to get the needed credentials to create the k8s client (the in-cluster client mode does not have the ~/.kube/config).

The driver headless service is a Kubernetes headless service that the executors use to connect to the driver pod. The service basically gives a DNS name for the driver. Executors connect to the driver through the service instead of the driver IP address. See more details in https://github.com/apache-spark-on-k8s/spark/pull/483.

In the in-cluster client mode as described in this PR, the headless service won't be created because the submission client is not invoked. This is problematic and should be addressed.

echarles commented 6 years ago

In the in-cluster client mode as described in this PR, the headless service won't be created because the submission client is not invoked. This is problematic and should be addressed.

I see now your point. I guess when gave have 2 different step-paths: the current one for cluster-mode, and a slightly updated one for the client-mode with a ClientDriverConfigurationStep that simply returns a value that could have been set [1] as a k8s-env or [2] as a property in the SparkContext. Any thought?

mccheah commented 6 years ago

I am used to run spark on yarn-client and in that case, the Yarn resource manager exposes the Application Master which proxies the Spark UI (so you browse e.g. http://:8088/proxy/application_1513256521420_0002/jobs). With Yarn, the driver process remains on the local host, just like with what I have done. However, the spark UI (typically on port 4040) is not proxied and a user has to browse http://:4040/application_1513256521420_0002/jobs). Not sure how mesos is behaving with that (I remember I had to create a ssh tunnel to browse the UI so I would say there is no proxy, but I don't have access anymore to mesos-based cluster).

Don't think we should have to do anything special here, it's up to the JVM that runs the Spark application as to how to expose it, but we can't enforce anything here AFAIK.

The needed changes are about creating the kubernetes client with the correct credentials, passing a null driverpod and disabling a few properties. So instead of creating a separated KubernetesClientSchedulerBackend, what about configuring and renaming the exisiting KubernetesClusterSchedulerBackend (let's say simply KubernetesSchedulerBackend)?

I'm worried that introducing too many of these - edit: "these" being properties that are present or null based on cluster/client mode - will make it opaque as to what properties are specific to cluster mode and which properties are specific to client mode. Having the separate KubernetesClusterSchedulerBackend and KubernetesClientSchedulerBackend would make it clear what aspects of the schedulers and their properties that are specific to client versus cluster mode. I'd expect most of the code to be shared in the parent class, KubernetesSchedulerBackend, with cluster mode and client mode each setting up their own bootstraps. You could accomplish something similar with composition instead of inheritance, which is usually preferred. For example one could have a KubernetesSchedulerBackend which is a concrete class, that takes in a KubernetesDeployModeSpecificHandlers that is a trait and subclassed in Client and Cluster implementations, and go from there. But I think using Option or null everywhere would result in a proliferation of properties that are ambiguous in which mode they should apply to and why.

Regarding the client in-cluster, the logic should reside in our KubernetesSchedulerBackend, so the kuberenetes client (if it supports it) could do the job. Or should we test for /var/run/secrets/kubernetes.io/serviceaccount? Any pointers and concrete methods welcome.

Are you referring specifically to how we should configure the Kubernetes client? If so, I think we should just configure everything using Spark properties. The naming of these properties is tricky, because cluster mode namespaces properties between submission and driver, whereas no such division should exist in client mode. But maybe just using the driver prefix for everything will be sufficient even if it's a redundant label in client mode, so to speak.

Edit: We should also support loading from on-disk, and it's similar to spark.hadoop. properties vs. using HADOOP_CONF_DIR for Hadoop configuration - this is noted again below.

For now I disable the oauth properties. @mccheah, can you bring some light about this? (I could point to a file, but I don't see any information about oauth in my local kubernetes configuration).

Kubernetes clusters can support OAuth tokens for identity verification and authorization. We need to allow users to pass in either a token file or token text itself. The latter should be redacted somehow so that we don't print it in e.g. spark.logConf.

The place to load the certificate and keys must also be defined. ~/.kube seems a reasonable choice to me to start with.

We should support loading configuration from both the Spark config and the on-disk kube configs. This is similar to how Spark allows passing Hadoop properties via the files in HADOOP_CONF_DIR and also by setting properties prefixed with spark.hadoop..

The executor pod name must be updated (for now, it is always spark-executor-1...)

Why does this have to change? It should contain the app id and the executor id.

mccheah commented 6 years ago

The driver headless service is a Kubernetes headless service that the executors use to connect to the driver pod. The service basically gives a DNS name for the driver. Executors connect to the driver through the service instead of the driver IP address. See more details in #483.

To take a step back here, I don't think we should be creating a headless service at all for client mode. The idea behind client mode is that there's the inherent assumption that the driver is available over some hostname that is routable from all of the executors. @echarles - to your point that this code is unusable from a laptop - if we think about it, if one had an equivalent YARN cluster with the same firewall settings and your laptop was sitting in the same place with its own firewalls, then I'd expect our very same client mode applications that don't work with that YARN cluster to also not work in Kubernetes.

For client mode our measure of success should be: If I colocated my Kubernetes kubelets with existing YARN nodemanagers that I've been using for my client mode YARN applications, would I also be able to run my client mode Kubernetes applications from the same host where I run my YARN client mode applications? Conversely, if there was a networking setup that would have prohibited us from running in YARN client mode, then we also don't have to be concerned with the analogous scenarios in Kubernetes - such as e.g. the laptop->cluster case.

The situation I was concerned about before I found out what I did in https://github.com/apache-spark-on-k8s/spark/pull/456#issuecomment-343006176 was that the driver would need to reach out to the executors to hand them work. That would create a discrepancy between YARN and Kubernetes, because executors running in YARN usually are running on a fleet of hosts that have a unified firewall setting and are either all exposed at once to the driver or none are. In the latter case, it's expected that the client mode application shouldn't work anyways, but one could deploy their application inside the YARN cluster to get inside the firewall and work from there. In Kubernetes that would have been extremely difficult to do because any given individual pod is not routable from outside the cluster by default.

But given our findings, the expectation is now only that the driver needs to be reachable from the pods, which is far more manageable. How that driver would be exposed I would expect to be different case by case, considering a laptop versus deploying onto a server, vs. running the application inside a pod that exposes a headless service, etc. Thus the user that submits the application should have decided a-priori how their driver would be reachable by the executors. It's not immediately obvious that Spark itself has to determine that connectivity mechanism.

echarles commented 6 years ago

Don't think we should have to do anything special here, it's up to the JVM that runs the Spark application as to how to expose it, but we can't enforce anything here AFAIK.

The spark.driver.memory is an example of property we want to be sure it is taken into account. I guess spark-core is doing the job for us, but worth to double-check.

You could accomplish something similar with composition instead of inheritance, which is usually preferred. For example one could have a KubernetesSchedulerBackend which is a concrete class, that takes in a KubernetesDeployModeSpecificHandlers that is a trait and subclassed in Client and Cluster implementations, and go from there. But I think using Option or null everywhere would result in a proliferation of properties that are ambiguous in which mode they should apply to and why.

+1

Are you referring specifically to how we should configure the Kubernetes client? If so, I think we should just configure everything using Spark properties. The naming of these properties is tricky, because cluster mode namespaces properties between submission and driver, whereas no such division should exist in client mode. But maybe just using the driver prefix for everything will be sufficient even if it's a redundant label in client mode, so to speak.

I imagine the list of spark.driver.kubernetes... properties to be documented.... We will iterate on this. But I was more referring to a automatic way to detect if the client is in or out cluster. This state should not be passed via property as the system can detect it by him-self.

Kubernetes clusters can support OAuth tokens for identity verification and authorization. We need to allow users to pass in either a token file or token text itself. The latter should be redacted somehow so that we don't print it in e.g. spark.logConf.

So this is optional and should be passed via property.

We should support loading configuration from both the Spark config and the on-disk kube configs. This is similar to how Spark allows passing Hadoop properties via the files in HADOOP_CONF_DIR and also by setting properties prefixed with spark.hadoop..

Back to the property list...

Why does this have to change? It should contain the app id and the executor id.

It should, but it does not - All executors created in client mode have the strange name spark-executor-X (see screenshot a bit above).

mccheah commented 6 years ago

But I was more referring to a automatic way to detect if the client is in or out cluster. This state should not be passed via property as the system can detect it by him-self.

Think my idea here is similar to the spirit of https://github.com/apache-spark-on-k8s/spark/pull/456#issuecomment-351827245 - though I did see we almost posted at the same time =). I don't think Spark itself should need to determine if the application is in-cluster vs. out-of-cluster, but it just says that the driver running in client mode needs to be reachable by the executor pods, and it's up to the user to determine how to resolve that connectivity.

liyinan926 commented 6 years ago

To take a step back here, I don't think we should be creating a headless service at all for client mode. The idea behind client mode is that there's the inherent assumption that the driver is available over some hostname that is routable from all of the executors. @echarles - to your point that this code is unusable from a laptop - if we think about it, if one had an equivalent YARN cluster with the same firewall settings and your laptop was sitting in the same place with its own firewalls, then I'd expect our very same client mode applications that don't work with that YARN cluster to also not work in Kubernetes.

Yes, I meant this only applies to in-cluster client mode. And I think for in-cluster client mode, users should not need to worry about how to setup the connectivity between the executors and the driver.

mccheah commented 6 years ago

Yes, I meant this only applies to in-cluster client mode. And I think for in-cluster client mode, users should not need to worry about how to setup the connectivity between the executors and the driver.

But do we need to distinguish between in-cluster client mode? Again, if we treat client mode as the contract that "the driver needs to be reachable by its hostname", it's left up to the user to determine how to do that.

liyinan926 commented 6 years ago

But do we need to distinguish between in-cluster client mode? Again, if we treat client mode as the contract that "the driver needs to be reachable by its hostname", it's left up to the user to determine how to do that.

Ideally yes. In case of the headless service in the in-cluster client mode, are you suggesting users to manually create that as part of starting the client pod?

mccheah commented 6 years ago

It would be a first for Spark in client mode to determine its own connectivity. I don't think it's strictly necessary for Spark to do this. Users can create the headless service themselves.

liyinan926 commented 6 years ago

OK, I can buy that. This needs to be documented clearly.

echarles commented 6 years ago

fyi I made a first step a few days ago but had no time to finalize (you know, end of year...). It works for me in out-cluster mode and will further work for in-cluster in the coming days.

https://github.com/apache-spark-on-k8s/spark/compare/branch-2.2-kubernetes...datalayer-contrib:client-mode-datalayer-2

foxish commented 6 years ago

Thanks @echarles. This item is a P0 once we rebase our fork on upstream since it's a very popular feature request.

echarles commented 6 years ago

I have a working version that covers the 6 scenarios:

  1. spark-submit cluster-mode in-cluster
  2. spark-submit cluster-mode out-cluster
  3. spark-submit client-mode in-cluster
  4. spark-submit client-mode out-cluster
  5. spark-shell client-mode in-cluster
  6. spark-shell client-mode out-cluster

This can be viewed on https://github.com/apache-spark-on-k8s/spark/compare/branch-2.2-kubernetes...datalayer-contrib:client-mode-datalayer-2

Note to myself:

Questions to the community:

  1. It works without any change on the Headless driver. I would say that if it does the job, we don't have to worry about this... This may sound a bit naive, so I will try to bring light and evidence in the document I will write.
  2. Happy to get feedback on the already developed code.
  3. Once I will get something more final, should I open a PR on apache-spark-on-k8s/spark to we can prepare something or directly engage on the apache jira, repos and mailing lists?
foxish commented 6 years ago

That sounds awesome. Thanks Eric. A brief design doc would help us all get on the same page wrt the mechanics of it I think. Then we can use the weekly meeting to discuss that and then you can send a PR against upstream and start a JIRA. That seems like a good way to proceed here. Open to other ideas from others though.

On Jan 12, 2018 1:16 AM, "Eric Charles" notifications@github.com wrote:

I have a working version that covers the 6 scenarios:

  1. spark-submit cluster-mode in-cluster
  2. spark-submit cluster-mode out-cluster
  3. spark-submit client-mode in-cluster
  4. spark-submit client-mode out-cluster
  5. spark-shell client-mode in-cluster
  6. spark-shell client-mode out-cluster

This can be viewed on branch-2.2-kubernetes... datalayer-contrib:client-mode-datalayer-2 https://github.com/apache-spark-on-k8s/spark/compare/branch-2.2-kubernetes...datalayer-contrib:client-mode-datalayer-2

Note to myself:

  • Get rid of the hardcoded path for cert and key in case of out-cluster
  • Polish format
  • Document to ensure understanding of the behavior.
  • Unit test
  • Integration test

Questions to the community:

  1. It works without any change on the Headless driver. I would say that if it does the job, we don't have to worry about this... This may sound a bit naive, so I will try to bring light and evidence in the document I will write.
  2. Happy to get feedback on the already developed code.
  3. Once I will get something more final, should I open a PR on apache-spark-on-k8s/spark to we can prepare something or directly engage on the apache jira, repos and mailing lists?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/apache-spark-on-k8s/spark/pull/456#issuecomment-357184603, or mute the thread https://github.com/notifications/unsubscribe-auth/AA3U55XWoK2qcyVlgGZoLo6jKGnXax-eks5tJyLRgaJpZM4O_t1e .

echarles commented 6 years ago

@foxish The initial shot of the doc can be read on apache-spark-on-k8s/userdocs#25.

It is a bit dense but reviewed topics are IMHO needed. I will update based on your feedback (bring comment the PR or reply here). Thx.

echarles commented 6 years ago

Just pushed an update to the doc apache-spark-on-k8s/userdocs#25

fanzhen commented 6 years ago

@echarles Thanks for providing the codes and I'm studying it. As far as my team concerned, it's necessary to implement and have a good test on the client mode codes. Our proposal is to make a huge production-level transformation from yarn to k8s which may involve thousands of machines.