apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.07k stars 14.29k forks source link

Task with ALL_DONE trigger rule gets skipped #40535

Closed pramod-prabhakar-kapase-db closed 4 months ago

pramod-prabhakar-kapase-db commented 4 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.6.3

What happened?

PythonOperator job with TriggerRule.ALL_DONE skipped.

What you think should happen instead?

Task should trigger once all upstream task should be completed either failed, success or skipped

How to reproduce

from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.python import BranchPythonOperator from airflow.operators.dummy_operator import DummyOperator from datetime import datetime, timedelta

from airflow.utils.state import TaskInstanceState from airflow.utils.trigger_rule import TriggerRule

used to fatorize the code and avoid repetition

report_id= 99999 stageConfig={} stageConfig['name']='aaaa' stageConfig['address']='11111'

stageConfig1={} stageConfig1['name']='bbbb' stageConfig1['address']='22222'

stageConfig2={} stageConfig2['name']='cccc' stageConfig2['address']='33333'

test_array = []

test_array.append(stageConfig) test_array.append(stageConfig1) test_array.append(stageConfig2)

tabDays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"] default_args = { ' 'depends_on_past': False, 'start_date': datetime(2018, 6, 18), 'retries': 1, 'retry_delay': timedelta(minutes=1) } dag = DAG( 'Weekday', default_args=default_args, max_active_runs=1, catchup=False, schedule="/5 *")

returns the week day (monday, tuesday, etc.)

def get_day(**kwargs): kwargs['ti'].xcom_push(key='day', value=datetime.now().weekday())

returns the name id of the task to launch (task_for_monday, task_for_tuesday, etc.)

def branch(ti): print("$$$$$$$$$$")

print(ti.upstream_task_ids) Not available

print(ti.task.get_direct_relatives(True))
print(ti.task.get_direct_relative_ids(True))
upstream_task_list = []
upstream_task_id_set = ti.task.get_direct_relative_ids(True)

for task_id in upstream_task_id_set:
    print(task_id)
    upstream_task_list.append(ti.get_dagrun().get_task_instance(task_id=task_id))

upstream_task_status = TaskInstanceState.SUCCESS

for t in upstream_task_list:
    if t.state == TaskInstanceState.FAILED:
        upstream_task_status = TaskInstanceState.FAILED
        break

downstream_task_list = ti.task.get_direct_relative_ids(False)

if upstream_task_status == TaskInstanceState.SUCCESS:
    downstream_task_list.remove("move_file")
    return downstream_task_list
else:
    return "move_file"

def move_file_and_info(): print("move_file_and_info")

move_file = PythonOperator( task_id='move_file', trigger_rule=TriggerRule.ALL_DONE, python_callable=move_file_and_info, provide_context=True, dag=dag )

def bq_insert_call(): print("bq_insert....")

def df_task_call(): print("df_task....")

One dummy operator for each week day, all branched to the fork

prev_task = fork

prev_task = None for index, day in enumerate(test_array): bq_insert = PythonOperator(task_id='task_bq_insert' + day['name'], dag=dag, python_callable=bq_insert_call, provide_context=True,) t = PythonOperator(task_id='taskfor' + day['name'], dag=dag, python_callable=df_task_call, provide_context=True,)

if prev_task:
    prev_task.set_downstream(bq_insert)

bq_insert.set_downstream(t)

if index + 1 != len(test_array):
    fork1 = BranchPythonOperator(
        task_id='branching' + day['name'],
        python_callable=branch,
        provide_context=True,
        dag=dag)
    # prev_task.set_downstream(t)
    fork1.set_downstream(move_file)
    t.set_downstream(fork1)
    prev_task = fork1
else:
    #prev_task.set_downstream(t)
    prev_task.set_downstream(move_file)
    t.set_downstream(move_file)

Operating System

GCP managed service

Versions of Apache Airflow Providers

GCP

Deployment

Official Apache Airflow Helm Chart

Deployment details

NA

Anything else?

NA

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 4 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

pramod-prabhakar-kapase-db commented 4 months ago

from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.python import BranchPythonOperator from airflow.operators.dummy_operator import DummyOperator from datetime import datetime, timedelta

from airflow.utils.state import TaskInstanceState from airflow.utils.trigger_rule import TriggerRule

report_id= 99999 stageConfig={} stageConfig['name']='aaaa' stageConfig['address']='11111'

stageConfig1={} stageConfig1['name']='bbbb' stageConfig1['address']='22222'

stageConfig2={} stageConfig2['name']='cccc' stageConfig2['address']='33333'

test_array = []

test_array.append(stageConfig) test_array.append(stageConfig1) test_array.append(stageConfig2)

tabDays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"] default_args = { ' 'depends_on_past': False, 'start_date': datetime(2018, 6, 18), 'retries': 1, 'retry_delay': timedelta(minutes=1) } dag = DAG( 'Weekday', default_args=default_args, max_active_runs=1, catchup=False, schedule="/5 *")

def get_day(**kwargs): kwargs['ti'].xcom_push(key='day', value=datetime.now().weekday())

def branch(ti): print("$$$$$$$$$$")

print(ti.upstream_task_ids) Not available

print(ti.task.get_direct_relatives(True))
print(ti.task.get_direct_relative_ids(True))
upstream_task_list = []
upstream_task_id_set = ti.task.get_direct_relative_ids(True)

for task_id in upstream_task_id_set:
    print(task_id)
    upstream_task_list.append(ti.get_dagrun().get_task_instance(task_id=task_id))

upstream_task_status = TaskInstanceState.SUCCESS

for t in upstream_task_list:
    if t.state == TaskInstanceState.FAILED:
        upstream_task_status = TaskInstanceState.FAILED
        break

downstream_task_list = ti.task.get_direct_relative_ids(False)

if upstream_task_status == TaskInstanceState.SUCCESS:
    downstream_task_list.remove("move_file")
    return downstream_task_list
else:
    return "move_file"

def move_file_and_info(): print("move_file_and_info")

move_file = PythonOperator( task_id='move_file', trigger_rule=TriggerRule.ALL_DONE, python_callable=move_file_and_info, provide_context=True, dag=dag )

def bq_insert_call(): print("bq_insert....")

def df_task_call(): print("df_task....")

prev_task = None for index, day in enumerate(test_array): bq_insert = PythonOperator(task_id='task_bq_insert' + day['name'], dag=dag, python_callable=bq_insert_call, provide_context=True,) t = PythonOperator(task_id='taskfor' + day['name'], dag=dag, python_callable=df_task_call, provide_context=True,)

if prev_task:
    prev_task.set_downstream(bq_insert)

bq_insert.set_downstream(t)

if index + 1 != len(test_array):
    fork1 = BranchPythonOperator(
        task_id='branching' + day['name'],
        python_callable=branch,
        provide_context=True,
        dag=dag)
    # prev_task.set_downstream(t)
    fork1.set_downstream(move_file)
    t.set_downstream(fork1)
    prev_task = fork1
else:
    #prev_task.set_downstream(t)
    prev_task.set_downstream(move_file)
    t.set_downstream(move_file)
pramod-prabhakar-kapase-db commented 4 months ago

image

eladkal commented 4 months ago
What happened?
PythonOperator job with TriggerRule.ALL_DONE skipped.

What you think should happen instead?
Task should trigger once all upstream task should be completed either failed, success or skipped

how is this related to the image you shared and to your code? You have branch operator as upstream this means one branch is executing while other is skipped. in your example move_file is skipped because branchingaaaa decided to skip it.

If you still believe there is a bug please provide clear description of the issue and minimal reproduce example.