bitnami / charts

Bitnami Helm Charts
https://bitnami.com
Other
9.03k stars 9.22k forks source link

[bitnami/airflow] airflow task using sparkKubernetesOperator succeeds even if sparkjob is failed. #24446

Closed naveenbussari closed 7 months ago

naveenbussari commented 8 months ago

Name and Version

bitnami/airflow:2.6.0

What architecture are you using?

amd64

What steps will reproduce the bug?

  1. Deploy bitami/airflow 2.6.0 in k8s.
  2. Run an airflow dag using sparkKubernetesOperator and SparkKubernetesSensor The dag
    
    from airflow import DAG
    from datetime import timedelta, datetime
    from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
    from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
    import os

default_args={ 'depends_on_past': False, }

with DAG( 'spark-dag', default_args=default_args, description='sample spark dag', schedule_interval=timedelta(days=1), start_date=datetime(2022, 11, 17), catchup=False, tags=['maps'] ) as dag: t1 = SparkKubernetesOperator( task_id='sample-spark', application_file="spark/sample-spark/sample-spark.yaml", namespace="incentives", do_xcom_push=True, dag=dag )

t2 = SparkKubernetesSensor( task_id='spark-sensor', namespace="incentives", application_name="{{ task_instance.xcom_pull(task_ids='sample-spark')['metadata']['name'] }}", attach_log=True, poke_interval=5, dag=dag ) t1 >> t2

3. To create this issue, intentionally failing a sparkjob. In the spark.yaml , I am triggering a python job to read data(which should fail due to wrong configs)

When the dag is triggered,

- First sparkKubernetesOperator task is triggered.  In the kubernetes cluster, pod for sparkKubernetesOperator task is created, then the sparkapp, driver and executor pods.

The **driver pod runs into Error** state as expected. But the task is marked **success**
The logs of sparkKubernetesOperator task:
<img width="1768" alt="image" src="https://github.com/bitnami/charts/assets/88088284/e55e220c-d07a-4626-9ce7-023a40771a24">

The **xcom** returned null
<img width="863" alt="image" src="https://github.com/bitnami/charts/assets/88088284/9be2ce73-5c63-41d5-9cd4-f056b4cf66d9">

- Second the sensor task is triggered  and in Kubernetes end pod for sensor task is created. But the pod runs into Error state.
- The logs of sensor task
<img width="882" alt="image" src="https://github.com/bitnami/charts/assets/88088284/6e03db0b-503e-4f97-aae9-88765cdadd3c">
<img width="1792" alt="image" src="https://github.com/bitnami/charts/assets/88088284/1251071a-54e3-4bd7-83ec-13c2fe70dda1">

The spark.yaml has some labels.
```yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
 name: spark-pi-{{ ti.job_id }}
 namespace: incentives
 labels:
    spark_name: spark-pi-{{ ti.job_id }}
    dag_name: sample-spark
    environment: "prod"
    cloud: "aws"
    tier: "t1"
    team: "maps"

these dag and spark.yaml were working fine in the 2.0.2 version of airflow

Are you using any custom parameters or values?

No.

What is the expected behavior?

Expecting behaviour would be sparkKubernetesOperator task fails when sparkjob fails. At least the the xcom should return something related to the task.

What do you see instead?

We see a null value in xcom. SparkKubernetesOperator task succeeds even though the sparkjob failed.

carrodher commented 8 months ago

The issue may not be directly related to the Bitnami container image or Helm chart, but rather to how the application is being utilized or configured in your specific environment.

Having said that, if you think that's not the case and are interested in contributing a solution, we welcome you to create a pull request. The Bitnami team is excited to review your submission and offer feedback. You can find the contributing guidelines here.

Your contribution will greatly benefit the community. Feel free to reach out if you have any questions or need assistance.

If you have any questions about the application itself, customizing its content, or questions about technology and infrastructure usage, we highly recommend that you refer to the forums and user guides provided by the project responsible for the application or technology.

With that said, we'll keep this ticket open until the stale bot automatically closes it, in case someone from the community contributes valuable insights.

naveenbussari commented 8 months ago

For now fixed this issue by changing the spark_kubernetes_operator.py . I've modified the execute method as follows to fail the task when the driver pod ran into Error state

           elif event["type"] == "MODIFIED":
                pod_status_info = event['object']
                self.log.info(f"printing type of object: {type(pod_status_info)}")

                self.log.info(f"\n printing the event : {pod_status_info.status.phase}")
                if pod_status_info.status.phase == "Succeeded":
                    self.log.info(f"\n sparkapplication is {name} successfully completed")
                    break
                elif pod_status_info.status.phase == "Failed":
                    self.log.info(f"\n sparkapplication {name} is failed")
                    raise AirflowException(f"SparkApplication {name} failed")
                else:
                    continue
            else:
                break

But still not able to fetch xcom returns though.

github-actions[bot] commented 7 months ago

This Issue has been automatically marked as "stale" because it has not had recent activity (for 15 days). It will be closed if no further activity occurs. Thanks for the feedback.

github-actions[bot] commented 7 months ago

Due to the lack of activity in the last 5 days since it was marked as "stale", we proceed to close this Issue. Do not hesitate to reopen it later if necessary.