apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.1k stars 14.3k forks source link

Re-launch the same spark operator is failing with error "sparkapplications.sparkoperator.k8s.io \"smoke-test\" already exists" #35395

Open xs2tarunkukreja opened 1 year ago

xs2tarunkukreja commented 1 year ago

Apache Airflow version

2.7.2

What happened

I am using following code to launch PySpark Job:- t1 = SparkKubernetesOperator( task_id='env-check', namespace="spark-operator", application_file='pipeline.yaml', do_xcom_push=True, dag=dag, )

t2 = SparkKubernetesSensor( task_id='env_app_monitor', namespace="spark-operator", attach_log=True, application_name="{{ task_instance.xcom_pull(task_ids='env-check')['metadata']['name'] }}", dag=dag, )

When I launched it first time, it works fine but on future run, it is failing.

What you think should happen instead

It should run PySpark Job in all future runs.

How to reproduce

Just run a Spark Application twice.

Operating System

K8S

Versions of Apache Airflow Providers

2.7.2

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

zhangkuantian commented 12 months ago

When submitting a task with the Airflow SparkKubernetesOperator, it does not check whether the old Spark application exists. If the old application is present, an error will be thrown. This issue can be avoided by setting a unique name for each run in the application file's yaml configuration, eg:

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: "smoke-test-{{ execution_date.strftime('%Y%m%d') }}-{{ task_instance.try_number }}"
spec:
 xxxxx