Open assaf-xm opened 1 year ago
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
@assaf-xm It seems like this ability was deemed out of scope for Airflow as per the PR comments. As the commiiter, perhaps @blcksrx could give some more context?
Well if you look closely to the older PR you would find the delete
operation defined in the KubernetesHook
and also it produces logs about SparkApplication
beside of this fact that creating_namespaced_object
is not only about creating the SparkApplications
, it gives away the single responsibility principal of the method createing_namespaced_object
.
In addition, Since it deletes that namespaced_object
, it rewrites the object history in k8s and it can not be understand why that object failed for the first time!
if "name" in body_dict["metadata"]:
try:
api.delete_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
name=body_dict["metadata"]["name"],
)
self.log.warning("Deleted SparkApplication with the same name")
except client.rest.ApiException:
self.log.info("SparkApplication %s not found", body_dict["metadata"]["name"])
It's better to use simple macros on the object name such as name-{{ ds }}
and that would solved the problem
@blcksrx , note that the removal of this code breaks applications that use a constant names, so having the old behavior as an option will be very useful for some of the users.
Working with name-{{ ds }}
could be a solution, however I find it less attractive since it can 'pollute' with a huge number of old spark applications and also finding the last spark application that ran is harder this way.
@assaf-xm from the compatibility, yes you are absolutely right, on the other hand, developer should do the cleaning by it self
@blcksrx , is there any easy way to clean spark applications from airflow dags? In any case, please consider adding this as an optional parameter to SparkKubernetesOperator
@assaf-xm You are welcome to submit a PR for this and reference the conversation above at which point it will be reviewed and decided upon. Otherwise this will remain as an open "issue" unless someone else picks it up.
Hi,
this change is very important for our usage. We have to change a lot the dags to manage the lifecycle of the SparkApp. Why not add an option the operator to let the users decide if we delete that SpatkApp or not before submitting ?
this is some kind of breaking change, but there is nothing about this in the release note. We have too dig into the source code to debug ourselves and the. Google the issue which leads me here. I think it’s better to improve the release notes for the future.
@renxunsaky feel free to open PRs to address the needed issues
We have no intention of releasing code that has breaking changes without info but like any software sometimes there are regressions. Since you found the issue and knowledgeable about it you seems to be the best person to handle it.
I'd be happy to review and cut a release as soon as issues are fixed in main branch
this is some kind of breaking change, but there is nothing about this in the release note. We have too dig into the source code to debug ourselves and the. Google the issue which leads me here. I think it’s better to improve the release notes for the future.
We always release RC versions and call for the community to test the code. If community reports regression we do not release till issue is fixed. I welcome you to participate in the effort of testing releases.
@renxunsaky @eladkal Just to clarify that in the https://github.com/apache/airflow/pull/31798 I fixed the compatibility issue and the operator works as like before (with deleting the same app) as before.
When a user adds this argument watch=True
to the operator. the operator monitors the whole state of the SparkApplication and streams the logs to the output and deletes the k8s pod.
The whole idea is that an operators (such as mysql transfer) is responsible to open the connection, transfer data, monitor the states and finally close the connection (do the cleanup process).
if so then @blcksrx can you clarify what is the task on this issue?
@eladkal @renxunsaky
The old KubernetesHook
used to delete the CRD application on the create_custom_object
. In other words, it deletes the object then it creates a new one.
So if the application name is same always, there would be always 1 sparkApp, but if the user used smth like:
name: spark-app-{{ds}}
There would be a lot of sparkApp and it
My suggestion is to adopt this approach that used in the pod_manager
https://github.com/blcksrx/airflow/blob/fe360cb11d98305c2b7a5ba090412e4796887082/airflow/providers/cncf/kubernetes/utils/pod_manager.py#L777-L782C50
@eladkal, @blcksrx , the https://github.com/apache/airflow/pull/31798 PR doesn't fix the reported compatibility issue, the spark application isn't deleted on completion and also not automatically deleted when a new spark application is triggered again (causing the '409 - Already exists' error). As suggested above, adding another option like 'override_existing=True' will be very useful also for us, preventing the need to handle multiple instances of the same application and keep our existing dags working.
@assaf-xm got your point. Im gonna fix it on a new PR asap. appreciate the review in advance!
@blcksrx Hi, We have the same problem after upgrading airflow. Do you have any estimated time of fixing this issue?
@JavadHosseini can you help with reviewing https://github.com/apache/airflow/pull/36268 ? does it solve your issue?
Hi Guys, any update on that? it would help me as well.
Any update on this?
Apache Airflow version
2.6.2
What happened
Changes to the SparkKubernetesOperator which introduced in https://github.com/apache/airflow/pull/21092 were reverted by the changes added in https://github.com/apache/airflow/pull/29977
Until 2.6.2 when a spark application was created with the same name of a previous application, the old application (pods etc') was automatically deleted and the new application started (with the warning of 'Deleted SparkApplication with the same name' in the logs). After 2.6.2 this scenario is causing an error starting the new application (409 - Already exists) and the previous application needs to be manually deleted (there is no easy way to delete it programmatically from airflow dags).
Was this behavior change done on purpose? I can suggest to add a parameter to control the desired behavior.
What you think should happen instead
Changes to the SparkKubernetesOperator which introduced in https://github.com/apache/airflow/pull/21092 were reverted by the changes added in https://github.com/apache/airflow/pull/29977
How to reproduce
Start a new spark application with the same name of the previous one, compare 2.6.2 vs 2.5.3
Operating System
EKS
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
Code of Conduct