apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.31k stars 14.09k forks source link

Track SparkSubmitHook Yarn Cluster application with Yarn CLI #24171

Open tokoko opened 2 years ago

tokoko commented 2 years ago

Description

SparkSubmitHook should track yarn cluster-mode application status with yarn CLI rather than rely on spark-submit process logs. This would cut back on excessive memory usage and also make it much easier to make the operator deferrable later on.

Use case/motivation

While running most of our Spark workloads in Yarn cluster mode using SparkSubmitHook, we observed that celery workers were consistently low on memory. The main driver for the high memory consumption were spark-submit processes started from SparkSubmitHook, that took about 500mb of memory even though in yarn cluster mode they were doing essentially next to none of actual work. We refactored the hook to kill spark-submit process right after Yarn accepts the application and track the status with yarn application -status calls similar to how spark standalone mode is being tracked.

Another motivation for the change is to prepare the operator to be made deferrable later on. Polling from external java process that needs to be kept alive until operator exits can't be made deferrable. Using Yarn CLI for polling would fit easily with how deferrable operators work.

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

boring-cyborg[bot] commented 2 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

vchiapaikeo commented 10 months ago

Happy to give this a shot!

tokoko commented 10 months ago

@vchiapaikeo Hey, I went through several iterations of this since this issue. The version I'm trying to adopt right now is deferrable and uses yarn rest api rather than yarn cli, because making aiohttp calls from triggerer is a lot simpler than launching subprocesses. I'm using a modified version of HttpTrigger (the one that can listen for specific sentinel values being returned as response content) to listen for application state changes. I can share the code later if that helps.

vchiapaikeo commented 10 months ago

That'd be great @tokoko - I'm familiar with turning http processes into deferrable. My concern here is that the SparkSubmitHook supports both the client and cluster deploy modes and seemingly 3 cluster manager types. This solution would only be applicable with the Yarn cluster manager. I'd also likely develop locally with standalone mode as well so would not be able to make this rest call.

Do you know of a comprehensive solution to track driver status that would work for standalone, k8's, and yarn? Also, I believe it is possible that users do not have the yarn api gateway exposed. In GCP dataproc, I recall that we needed to modify a setting so that we could access the rest api via Apache Knox. --> https://cloud.google.com/dataproc/docs/concepts/accessing/dataproc-gateways

tokoko commented 10 months ago

@vchiapaikeo that's right. Full solution is really tricky here, that's why I chose to keep the implementation in-house, it was mostly just suited to our environment only :) Running client and local jobs in deferrable mode doesn't really make too much sense, I think. I would just throw an error in that case.

For cluster mode jobs, there's no comprehensive solution AFAIK. You can track standalone jobs with spark-submit itself, but you can't do the same in case of yarn and k8s. For them you would either use respective clients (yarn and kubectl) or rest api calls. A web of if-else clauses is probably the only way to go here.

That's true, tracking YARN with rest calls assumes that rest api is exposed, which might or might not be the case depending on the environment. in case of k8s, it's probably safe to assume that both kubectl and rest api will be available.

vchiapaikeo commented 10 months ago

Ooo @tokoko , I just recently learned that Spark master exposes a rest API with spark.master.rest.enabled true. Can you check if that's the case on your yarn deployment? If possible to access on Yarn, maybe we can use this as a comprehensive solution to obtain status?

I am running Spark on minikube following these instructions.

image

Rest API:

image

Don't mind the port numbers in the screenshot. I am using nodeport / minikube.

tokoko commented 10 months ago

Depends on what you mean exactly. YARN always exposes rest api, it just might or might not be accessible from airflow worker because of network security policies. The address can be configured as yarn.resourcemanager.webapp.address in yarn-site.xml.

So, all three cluster managers do expose rest apis, but the specifics of the apis are different. For example in YARN you have to call http://{yarn_address}/ws/v1/cluster/apps/{application_id}/state to get the necessary state. Take a look at this gist https://gist.github.com/tokoko/f559fb164c9433bb09c9618bf2949f33.

tirkarthi commented 4 weeks ago

I just want to add that we have been using yarn rest API to fetch application status in our custom deferrable operators related to spark on yarn and it has been working fine. We had also implemented the logic to kill the application when the task gets cleared or marked as success/fail from UI.

https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/NodeManagerRest.html#Application_API