airflow-helm / charts

The User-Community Airflow Helm Chart is the standard way to deploy Apache Airflow on Kubernetes with Helm. Originally created in 2017, it has since helped thousands of companies create production-ready deployments of Airflow on Kubernetes.
https://github.com/airflow-helm/charts/tree/main/charts/airflow
Apache License 2.0
665 stars 476 forks source link

Task with Branch Operator is following both branch #883

Open ramjeeanna opened 2 months ago

ramjeeanna commented 2 months ago

Checks

Chart Version

airflow-8.9.0

Kubernetes Version

1.28.9

Client Version: v1.31.0
Kustomize Version: v5.4.2
Server Version: v1.28.9
WARNING: version difference between client (1.31) and server (1.28) exceeds the supported minor version skew of +/-1

Helm Version

version.BuildInfo{Version:"v3.14.2", GitCommit:"", GitTreeState:"clean", GoVersion:"go1.21.11"}

Description

When multiple DAG run is happening, sometimes the kwargs['param'] value are malfunctioned.

create Airflow DAG with Task Branch operator like below pre_check = BranchPythonOperator(task_id='pre_validlate', python_callable=pre_validate_fn). graph is below: init_report >> pre_check >> data_extraction >> end pre_check >> remove_table

The function of branch operator, pre_validate_fn will decide which branch to follow, the code is below: follow_branch = 'data_extraction' if not kwargs['param']['delta']: follow_branch = ['data_extraction', 'remove_table'] print(follow_branch) return follow_branch In we dont set detla, then it will remove as well as go for extraction In our case, if our input parameter is passed wit 'delta' as True even in that case, the condition is followed.

When such malfunctioned execution happens, the task remove_table is shown success in UI and table also deleted in logic. But the respective task_id=remove_table logs folder is not created under the relevant run_id folder.

Relevant Logs

NA

Custom Helm Values

NA