foxish / spark

Apache Spark on Kubernetes moved to
https://github.com/apache-spark-on-k8s/spark/
Apache License 2.0
5 stars 2 forks source link

Kubernetes ThirdPartyResource for tracking Spark Jobs #3

Open foxish opened 7 years ago

foxish commented 7 years ago

Issues:

Shortcomings of the current method:

Proposed Solution:

A ThirdPartyResource to keep track of the execution state (pending/running/failed) of each SparkJob, failure reasons if any, the identity of the driver and executor pods associated, as well as configuration metadata associated with that job (number of executors, memory per executor, etc).

metadata:
  name: spark-job.kubernetes.io
  labels:
    resource: spark-job
    object: spark
apiVersion: extensions/v1beta1
kind: ThirdPartyResource
description: "A resource that manages a spark job"
versions:
  - name: v1

The cluster administrator is responsible for creating this resource, which makes a new API endpoint available in Kubernetes. This TPR would enable us to create objects of the kind SparkJob and store JSON within them. Each such object would be associated with a single spark job, and would store all the status and metadata associated with it. The driver pod is responsible for the life-cycle of the SparkJob object, from creation till deletion.

A sample object of the above kind looks like the following:

{
    "apiVersion": "kubernetes.io/v1",
    "image": "driver-image",
    "kind": "SparkJob",
    "metadata": {
        "name": "spark-driver-1924",
        "namespace": "default",
        "selfLink": "/apis/kubernetes.io/v1/namespaces/default/sparkjobs/spark-driver-1924",
        "uid": "91022bc2-a71d-11e6-a4be-42010af00002",
        "resourceVersion": "765519",
        "creationTimestamp": "2016-11-10T08:13:31Z"
    },
    "num-executors": 10,
    "state": "completed",
    "driver-pod": "driver-2ds9f"
    ...
    ...
}

The driver pod has complete visibility into progress of the job, and can set the status of its SparkJob object. The driver can also watch this resource for configuration changes which may be triggered by the user/cluster administrator. Killing a spark-job can be performed by destroying the associated SparkJob object, which will cause the driver pod to terminate its executors and clean up gracefully.

Further thought:

foxish commented 7 years ago

cc @tnachen @erikerlandson @erictune

tnachen commented 7 years ago

It sounds like a great way to provide visiblity and operate Spark. Need to think through a bit more, but some questions for now:

erikerlandson commented 7 years ago

@foxish TPR looks like a cool kube feature! Regarding lifecycle visibility and mgmt, I'm increasingly interested in adding some new published metrics to Spark. Particularly, having Executors report some new metrics that reflect the size of their queued job backlog. My main interest in that is so it could be harvested for use by an HPA for executor pods. In other words, it would allow external scaling logic to use the same flavor of information that Spark's dynamic-executor logic uses, but since the logic could be external, it opens up a large solution space of scaling logic without requiring that logic to live inside Spark (let Spark do what it does well, and not try to be in the business of advanced scaling and sharing features)(*)

But you could take this idea further, and publish other metrics that provide increased visibility into the job progress. Some of this may already be available from the rest API that already exists in Spark? Spark's web UI gets its job progress information from somewhere.

(*) This would also require the ability to externally signal a Spark executor into graceful shutdown, but that is outside the scope of this topic

tnachen commented 7 years ago

Job progress information is all available through the Spark Listener interface that gets reported from the job itself. I think most of the dynamic allocation information is published through jmx already.

Anyhow, seems like an interesting idea but I don't know how this fits with this proposed TPR?

iyanuobidele commented 7 years ago

It sure sounds like a great way to monitor the state of spark jobs!

Are we going to have a TPR per namespace.....

I believe k8s doesn't currently support non-namespaced object, so for this to work we need to create this TPR to exist in the namespace as the spark job about to be deployed. So this might answer some part of @tnachen's first question. Since we already depend on the namespace specified at runtime or in the spark conf file, I agree with @foxish's initial idea of creating the TPR ahead of time in this same namespace.

so not all Spark jobs across all namespaces is visible and managable?

By this, if you mean the new kind just created by TPR, I believe you should be able to do kubectl get sparkjob --all-namespaces provided that the TPR's in all namespaces all have the same resource type name. (I haven't really tried this out)

erikerlandson commented 7 years ago

@tnachen I was proposing some possible alternatives to increasing visibility into driver+executor(s) state (from the k8s layer). (alternatives that might be aligned with some of my other interests)

But TPR is idiomatic w.r.t. a kube environment. I might turn it around, and consider publishing TPR objects to represent the loading state of each executor.

Can HPA using custom metrics be aimed at one of these ThirdPartyResource endpoints?

iyanuobidele commented 7 years ago

Some questions:

The driver pod has complete visibility into progress of the job, and can set the status of its SparkJob object.

How do you see implementing this ? Maybe explain a little further what the relationship between the pod and the sparkjob resource type is. Is this like a pod and job kind of relationship, and if so are there any additional reasons why we can't use the existing job kind instead of creating a new resource type ?

erikerlandson commented 7 years ago

@iyanuobidele, IIUC the idea is to have the scheduler (and/or scheduler backend) publish these objects into the namespace the app is running on, presumably using the fabric8 client, with information on the current state of the application

foxish commented 7 years ago

@iyanuobidele The TPR object is meant to store state about the running SparkJob and act as a data model. There is benefit in looking at individual entities as "SparkJobs" in the kubernetes system, as opposed to simply driver/executor pods IMO. Exposing the state of the spark job to kubernetes also allows us to write custom controllers which can make cluster-level decisions as @erikerlandson mentioned.

Is this like a pod and job kind of relationship, and if so are there any additional reasons why we can't use the existing job kind instead of creating a new resource type ? Maybe explain a little further what the relationship between the pod and the sparkjob resource type is

A Kubernetes Job is managed by the JobController, which deals with pod creation and deletion decisions. With native integration, we let Spark act as the "controller". So, it's not quite the pod-job type of relationship. The TPR is really just being used as a key-value store which the driver writes to, and all other components read from. I'm unsure if there is a use-case where we want to change the configuration of a job after launching the driver.

The driver pod would create the TPR object when it comes up and update it at regular intervals with metrics and state. Since we are using namespaces as the mechanism to isolate users, it makes sense for the TPR object to live in the same namespace as that user's spark driver and executor pods. The Spark "JobID" could be associated with the TPR object, and be used for finding status, summarizing results, etc and we would not need to directly talk to pods.

tnachen commented 7 years ago

I'm also wondering if we should make it a hard requirement for the TPR to exist for Spark jobs to work on k8s. I don't think there is any reason why the TPR should be a problem for anyone, but just want to make sure this is explicit and we should probably check from the driver side and have a sensible message.

erikerlandson commented 7 years ago

I'd prefer at least the option for the submission client to create the TPR automatically, unless it requires admin privs like the service acct does. It should be easy to do in the fabric8 client, assuming it has a fluent constructor for it.

foxish commented 7 years ago

The ThirdPartyResource itself (which creates the additional API endpoint) needs to be created exactly once for the entire cluster. That would be a prerequisite I imagine. The TPR objects on the other hand could be created automatically by the submission client.

erikerlandson commented 7 years ago

@foxish, if it's not idempotent that makes it slightly harder, but I assume the code can check for existence first if need be. If I'm reading correctly, a TPR gets created on a per-namespace basis.

foxish commented 7 years ago

A TPR (for example, the one of kind 'SparkJob' in the issue) itself is not namespaced. The third party objects which are created of that SparkJob resource we introduce, however, are namespaced. If the TPR is not created for that cluster, Spark will be unable to create Third Party objects in which we want to store state and as you said, it can detect this and complain about it.

iyanuobidele commented 7 years ago

@foxish @erikerlandson Thanks for the clarification.

iyanuobidele commented 7 years ago

A TPR (for example, the one of kind 'SparkJob' in the issue) itself is not namespaced. The third party objects which are created of that SparkJob we introduce, however, are namespaced. If the TPR is not created for that cluster, Spark will be unable to create Third Party objects in which we want to store state and as you said, it can detect this and complain about it.

We could add a conf value spark.kubernetes.failOnUncreatedTPR (a better name can be used) with true or false values and in the case where the TPR hasn't been created make a decision based on the value. IMO, I think it's okay to create it as long as the user specifies that. Otherwise the user can create it before running the spark job.

tnachen commented 7 years ago

I rather we keep it simple for now and don't introduce a flag until later, since I don't know if a TPR creation is idempotent, and seems like this TPR needs to be configured correctly as it needs to store state, etc.

erikerlandson commented 7 years ago

If a TPR is created cluster-wide, it almost certainly requires admin privs. On those grounds (and because it is only created once per cluster anyway) I'm in agreement that it's sanest to assume it was created by an admin, prior to submission

yeah, it requires admin privs:

[eje@localhost ~]$ kubectl create -f resource.yaml 
Error from server: error when creating "resource.yaml": User "developer" cannot create extensions.thirdpartyresources at the cluster scope

[eje@localhost ~]$ oc login -u system:admin
Logged into "https://10.0.1.36:8443" as "system:admin" using existing credentials.

[eje@localhost ~]$ kubectl create -f resource.yaml 
thirdpartyresource "cron-tab.stable.example.com" created
erictune commented 7 years ago

Thoughts:

tnachen commented 7 years ago

@erictune mostly agree, but to your point the link somewhere to show the driver SparkUI and states either is just a console output telling you where it is, or we start simple with the TPR and let is just act as a UI querying state in the namespace and then eventually move to more.

erikerlandson commented 7 years ago

@erictune if the TPR is global, can you delete it without clobbering every spark driver running on the cluster?

Agreed that if TPR isn't defined, then child objects just don't get published.

With some basic loading stats we can do kube-level executor scaling now, as long as we're willing to lose some work due to non-graceful scale down. It's wasteful to some degree, but Spark is designed to work around it. Graceful executor shutdown could be added later, although I'm hoping to try it out to see if it is as straightforward as I believe.

erictune commented 7 years ago

Currently, deleting the TPR (the "schema" for all SparkJobs) will remove the API endpoint and leave the individual SparkJob objects in etcd, but inaccessible. I suspect in time we will make this do a cascading delete like namespace: we just haven't gotten to this. If we do this, we will need to document to admins not to delete TPRs carelessly.

erictune commented 7 years ago

Open question of whether TPRs will be describable via kubectl, either with or without an extension.

iyanuobidele commented 7 years ago

Working on a minimal PR that pushes the sparkJob objects into etcd and also update it at interval. Here are some of the things I've found out:

Please share some of your thoughts on these findings. @foxish @tnachen @erikerlandson @erictune

erikerlandson commented 7 years ago

My take is that it's not a show-stopper to just PUT some json to the appropriate TPR endpoint. The kubernetes scheduler is intended to be a sub-project of Spark, and so that limits the scope of any additional deps for doing REST calls to the TPR endpoints.

Not sure if it could be done fast enough to help us in the near term, but we could reach out to @prashantchitta or @tazjin, who both offered to work on a PR for https://github.com/fabric8io/kubernetes-client/issues/299

erikerlandson commented 7 years ago

@iyanuobidele I'm not following the reference to "carrying state on etcd," can you expand on that?

iyanuobidele commented 7 years ago

@erikerlandson What i meant is, Ultimately all TPR instances are stored in etcd which is how the objects are persisted. etcd

iyanuobidele commented 7 years ago

Here's the behavior i noticed that made me think about an http client

Kubernetes version:

k8s-version

When I try kubectl get SparkJobs, kubectl weirdly picks when to work

spark-jobs

when clearly the TPR exists and an instance has been created. (Here, I tried to post again)

screenshot from 2016-11-18 15-02-35

but doing a get to the endpoint returns the object always.

get-request

tazjin commented 7 years ago

@erikerlandson I haven't had the time to put in a PR to the fabric8 client, but because we had a need to do exactly this (coincidentally for Apache Flink jobs) what we ended up doing is reflecting the configured HTTP client out of the Fabric8 client instance and using that directly.

erikerlandson commented 7 years ago

@tazjin that makes sense - I was thinking we might use whatever HTTP libs fabric8 is using to avoid adding any additional deps

tazjin commented 7 years ago

@erikerlandson You may want to actually reflect instance of the Fabric8 one out, it has the appropriate authorisation and TLS settings configured (which you probably don't want to do manually). See my comment over on the other issue.

erikerlandson commented 7 years ago

@tazjin agreed, it addresses the dependency surface and also benefits from the auth and configuration already established in the client cc/ @iyanuobidele

foxish commented 7 years ago

@iyanuobidele It appears that the behavior you hit with kubectl get is a bug. You can run the same at higher verbosity (-v=6 or -v=9) to try and see what's happening. I can't seem to reproduce it with newer versions of the client/server.

After creating a TPR, there could be a lag between the time of creation and availability of the api endpoint. So we are required to always check before making any requests to the endpoint. The implication of this is that, unavailability of the endpoint doesn't necessarily mean the TPR does not exist. We can implement some type of backoff mechanism for this which is not a problem. See the part labeled note here.

The creation of the TPR itself is a cluster-wide operation and we can assume that it will occur separately and prior to any user trying to launch Spark Jobs. I agree that we should check for its existence, but I don't think we need to retry or wait for it to be available.

While it is not possible (due to a bug to be fixed in 1.6) to use kubectl edit/patch on TPRs, it is still possible to PUT/PATCH the resource. The following works, for example:

TPR:

metadata:
  name: spark-job.apache.io
  labels:
    resource: spark-job
    object: spark
apiVersion: extensions/v1beta1
kind: ThirdPartyResource
description: "A resource that manages a spark job"
versions:
  - name: v1

Sample object

apiVersion: "apache.io/v1"
kind: "SparkJob"
metadata:
  name: "spark-job-1"
spec:
  image: "driver-image"
  state: "completed"
  num-executors: 10
curl -k -H "Authorization: Bearer XXX" -H "Content-Type: application/json-patch+json" -X PATCH --data '[{"op": "replace", "path": "/spec/num-executors", "value": "13"}]' https://<k8s-master>/apis/apache.io/v1/namespaces/default/sparkjobs/

It should be possible to use the reflected HTTP client for this as @tazjin mentioned.

iyanuobidele commented 7 years ago

Moving on to the next set of things addressing this issue

  1. completing the two way relationship we discussed about the TPR instance, that is, deleting/destroying the SparkJobObject should kill the SparkJob.

  2. adding support for spark-submit --status/kill

My initial thoughts on (1) above:

Adding support for spark-submit --status/kill should be straightforward and I would start looking into that.

what are your thoughts on (1)? /cc @foxish @erikerlandson

foxish commented 7 years ago

(1) seems like something that should be done using a watch. I'm not sure if the fabric8 library allows us to watch TPR instances, but if not, we should be able to roll out our own similar to the current code surrounding TPRs. We may be able to get by without this watching/two-way relationship however, for implementing (2) which is necessary for the first PR. We can have the SparkJob instance record the names of driver and executor pods which are launched onto the cluster. Upon executing spark-submit --kill, lookup the SparkJob instance, find the corresponding driver (and executor pods if they've been created) and issue DELETE calls to k8s directly. If a cluster administrator does not create the TPR, then the user will be unable to use spark-submit to find status/control his jobs and will have to fall back to using kubectl or the kubernetes dashboard directly.

I've been thinking about the SparkJob resource. Tying it to the driver's lifecycle may not be the right solution because we want to report on the status of a SparkJob even when the driver is yet to enter the running state. In the current implementation, we don't create a SparkJob resource till the driver starts to run, which may not be for a while in a cluster with resource constraints. I think the right way would be to create the TPR at the time we create the driver (from the client-side), but allow the driver to clean it up eventually when it exits.

foxish commented 7 years ago

@iyanuobidele We're now moving all development to our own org. You're one of the owners of the org as well. Future PRs should be made against https://github.com/apache-spark-on-k8s/spark

Thanks!

iyanuobidele commented 7 years ago

Got it. Thanks !

@foxish is there a SIG or some avenue for meetings/discussions for the org ?

foxish commented 7 years ago

oops, missed this. @iyanuobidele Not yet. But I think we can reuse and reinvigorate https://groups.google.com/forum/#!forum/kubernetes-sig-big-data. Will work on getting it setup and active again.

iyanuobidele commented 7 years ago

@foxish sounds great. Thanks.