apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.16k stars 14.04k forks source link

Trigger Rule ONE_FAILED does not work in task group with mapped tasks #34023

Open benbuckman opened 1 year ago

benbuckman commented 1 year ago

Apache Airflow version

2.7.0

What happened

I have the following DAG:

from __future__ import annotations
from datetime import datetime

from airflow.decorators import dag, task, task_group
from airflow.utils.trigger_rule import TriggerRule

@task
def get_records() -> list[str]:
    return ["a", "b", "c"]

@task
def submit_job(record: str) -> None:
    pass

@task
def fake_sensor(record: str) -> bool:
    raise RuntimeError("boo")

@task
def deliver_record(record: str) -> None:
    pass

@task(trigger_rule=TriggerRule.ONE_FAILED)
def handle_failed_delivery(record: str) -> None:
    pass

@task_group(group_id="deliver_records")
def deliver_record_task_group(record: str):
    (
        submit_job(record=record)
        >> fake_sensor(record=record)
        >> deliver_record(record=record)
        >> handle_failed_delivery(record=record)
    )

@dag(
    dag_id="demo_trigger_one_failed",
    schedule=None,
    start_date=datetime(2023, 1, 1),
)
def demo_trigger_one_failed() -> None:
    records = get_records()
    deliver_record_task_group.expand(record=records)

demo_trigger_one_failed()

But this does not work. handle_failed_delivery is skipped, and (based on the UI) it's skipped very early, before it can know if the upstream tasks have completed successfully or errored.

Here's what I see, progressively (see How to reproduce below for how I got this):

started ... skipped too early ... fake sensor about to fail... ... done, didn't run
Screenshot 2023-09-01 at 3 26 49 PM Screenshot 2023-09-01 at 3 26 50 PM Screenshot 2023-09-01 at 3 26 53 PM Screenshot 2023-09-01 at 3 26 56 PM

If I remove the task group and instead do,

@dag(
    dag_id="demo_trigger_one_failed",
    schedule=None,
    start_date=datetime(2023, 1, 1),
)
def demo_trigger_one_failed() -> None:
    records = get_records()
    (
        submit_job(record=records)
        >> fake_sensor.expand(record=records)
        >> deliver_record.expand(record=records)
        >> handle_failed_delivery.expand(record=records)
    )

then it does the right thing:

started ... waiting ... ... done, triggered correctly
Screenshot 2023-09-01 at 3 46 48 PM Screenshot 2023-09-01 at 3 46 50 PM Screenshot 2023-09-01 at 3 46 53 PM

What you think should happen instead

The behavior with the task group should be the same as without the task group: the handle_failed_delivery task with trigger_rule=TriggerRule.ONE_FAILED should be run when the upstream fake_sensor task fails.

How to reproduce

  1. Put the above DAG at a local path, /tmp/dags/demo_trigger_one_failed.py.

  2. docker run -it --rm --mount type=bind,source="/tmp/dags",target=/opt/airflow/dags -p 8080:8080 apache/airflow:2.7.0-python3.10 bash

  3. In the container:

    airflow db init
    airflow users create --role Admin --username airflow --email airflow --firstname airflow --lastname airflow --password airflow
    airflow scheduler --daemon
    airflow webserver
  4. Open http://localhost:8080 on the host. Login with airflow / airflow. Run the DAG.

I tested this with:

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

n/a

Deployment

Other Docker-based deployment

Deployment details

This can be reproduced using standalone Docker images, see Repro steps above.

Anything else

I wonder if this is related to (or fixed by?) https://github.com/apache/airflow/issues/33446 -> https://github.com/apache/airflow/pull/33732 ? (The latter was "added to the Airflow 2.7.1 milestone 3 days ago." I can try to install that pre-release code in the container and see if it's fixed.) edit: nope, not fixed

Are you willing to submit PR?

Code of Conduct

benbuckman commented 1 year ago

I was curious if https://github.com/apache/airflow/pull/33732 (https://github.com/apache/airflow/commit/fe27031382e2034b59a23db1c6b9bdbfef259137) fixes the same issue I'm describing here. That fix is on main.

  1. In my git clone of this repo,

    git checkout main
    git pull

    (I'm at 3ae6b4e86fe807c00bd736c59df58733df2b9bf9)

    docker build . -f Dockerfile --pull --tag airflow-trigger-rule-test:0.0.1
    docker run -it --rm --mount type=bind,source="/tmp/dags",target=/opt/airflow/dags -p 8080:8080 airflow-trigger-rule-test:0.0.1 bash
  2. "In the container:" step 3 from above, run the DAG ...

Nope – it exhibits the same incorrect behavior of skipping handle_failed_delivery before the task group has finished, and not respecting the trigger_rule.

RNHTTR commented 1 year ago

Seems as though it only affects mapped tasks. That is, it runs as expected if you replace deliver_record_task_group.expand(record=records) with deliver_record_task_group(record=records).

RNHTTR commented 1 year ago

Curiously, the GANTT chart shows the task group has been queued for 24 hours

image
RNHTTR commented 1 year ago

Like @benbuckman reported, the task was skipped before submit_job or fake_sensor ran.

debug logs:

[2023-09-02T23:21:23.108+0000] {retries.py:92} DEBUG - Running SchedulerJobRunner._schedule_all_dag_runs with retries. Try 1 of 3
[2023-09-02T23:21:23.110+0000] {scheduler_job_runner.py:1485} DEBUG - DAG demo_trigger_one_failed not changed structure, skipping dagrun.verify_integrity
[2023-09-02T23:21:23.111+0000] {dagrun.py:740} DEBUG - number of tis tasks for <DagRun demo_trigger_one_failed @ 2023-09-02 23:21:21.860553+00:00: manual__2023-09-02T23:21:21.860553+00:00, state:running, queued_at: 2023-09-02 23:21:21.885385+00:00. externally triggered: True>: 6 task(s)
[2023-09-02T23:21:23.112+0000] {dagrun.py:761} DEBUG - number of scheduleable tasks for <DagRun demo_trigger_one_failed @ 2023-09-02 23:21:21.860553+00:00: manual__2023-09-02T23:21:21.860553+00:00, state:running, queued_at: 2023-09-02 23:21:21.885385+00:00. externally triggered: True>: 2 task(s)
[2023-09-02T23:21:23.116+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.116+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.116+0000] {taskinstance.py:1159} DEBUG - Dependencies all met for dep_context=None ti=<TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 [None]>
[2023-09-02T23:21:23.210+0000] {abstractoperator.py:573} DEBUG - Updated in place to become <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]>
[2023-09-02T23:21:23.215+0000] {abstractoperator.py:598} DEBUG - Expanding TIs upserted <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]>
[2023-09-02T23:21:23.218+0000] {abstractoperator.py:598} DEBUG - Expanding TIs upserted <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]>
[2023-09-02T23:21:23.224+0000] {taskinstance.py:956} DEBUG - Setting task state for <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [None]> to skipped
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'one_failed' requires one upstream task failure, but none were found. upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.fake_sensor'}
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'one_failed' requires one upstream task failure, but none were found. upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.fake_sensor'}
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.236+0000] {scheduler_job_runner.py:1512} DEBUG - Skipping SLA check for <DAG: demo_trigger_one_failed> because no tasks in DAG have SLAs
[2023-09-02T23:21:23.236+0000] {scheduler_job_runner.py:1504} DEBUG - callback is empty
[2023-09-02T23:21:23.241+0000] {scheduler_job_runner.py:414} INFO - 3 tasks up for execution:
    <TaskInstance: demo_trigger_one_failed.deliver_records.submit_job manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [scheduled]>
    <TaskInstance: demo_trigger_one_failed.deliver_records.submit_job manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [scheduled]>
    <TaskInstance: demo_trigger_one_failed.deliver_records.submit_job manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [scheduled]>
benbuckman commented 1 year ago

Seems as though it only affects mapped tasks.

Yes, thanks @RNHTTR for that clarification. I updated the issue title accordingly.

hussein-awala commented 1 year ago

@ephraimbuddy the problem is in the method get_relevant_upstream_map_indexes where when we try to get the relevant map index for upstream deliver_record and we call this method

ti.get_relevant_upstream_map_indexes(
    upstream=ti.task.dag.task_dict[upstream_id],
    ti_count=expanded_ti_count,
    session=session,
)

we call it with this values:

ti.get_relevant_upstream_map_indexes(
    upstream="deliver_records.deliver_record",
    ti_count=3, # we have 3 tis because it's a mapped task group
    session=session,
)

and this method doesn't take into account the mapped task group, so it return -1 instead of the same map index of the checked TI.

So we have two options:

  1. update get_relevant_upstream_map_indexes to make it handling mapped task groups.
  2. trying to detect that the two tasks are in a mapped task group without calling this method, in this case we can return the map index of the checked TI.

I'm already working on refactoring some queries in this class including the one which have this bug:

                task_id_counts = session.execute(
                    select(TaskInstance.task_id, func.count(TaskInstance.task_id))
                    .where(TaskInstance.dag_id == ti.dag_id, TaskInstance.run_id == ti.run_id)
                    .where(or_(*_iter_upstream_conditions(relevant_tasks=upstream_tasks)))
                    .group_by(TaskInstance.task_id)
                ).all()
benbuckman commented 1 year ago

Thanks @hussein-awala for digging into the fix so quickly.

Something else that's worth looking into and fixing here, is why unit tests with DebugExecutor behave differently. Take this unit test for example – simplified again for demonstration:

import unittest
from datetime import datetime, timezone

from airflow.exceptions import BackfillUnfinished
from airflow.executors.debug_executor import DebugExecutor
from airflow.models import DagBag
from airflow.models.taskinstance import TaskInstance

from .demo_trigger_one_failed import demo_trigger_one_failed

class TestDemoDag(unittest.TestCase):
    def test_handle_failed_delivery(self):
        dagbag = DagBag(include_examples=False, safe_mode=False)
        demo_dag = dagbag.get_dag("demo_trigger_one_failed")
        now = datetime.now(timezone.utc)

        # We need to use the slow DebugExecutor (not `dag.test()`) to run this
        # b/c of https://github.com/apache/airflow/discussions/32831
        demo_dag.clear()
        with self.assertRaises(BackfillUnfinished):
            demo_dag.run(
                start_date=now,
                end_date=now,
                executor=DebugExecutor(),
                run_at_least_once=True,
                verbose=True,
                disable_retry=True,
            )

        downstream_task_ids = list(demo_dag.task_group_dict["deliver_records"].children.keys())
        print(f"downstream_task_ids: {downstream_task_ids}")

        task_instance_states: dict[str, str | None] = {}  # task_id => state

        for task_id in downstream_task_ids:
            # (demo simplified w/ hard-coded 0 for single mapped task)
            ti = TaskInstance(demo_dag.task_dict[task_id], execution_date=now, map_index=0)
            task_instance_states[task_id] = ti.current_state()

        print(f"task_instance_states: {task_instance_states}")

        self.assertEqual("success", task_instance_states["deliver_records.submit_job"])
        self.assertEqual("failed", task_instance_states["deliver_records.fake_sensor"])
        self.assertEqual("upstream_failed", task_instance_states["deliver_records.deliver_record"])

        # Test says this ran and succeeded - but in actual scheduler/UI,
        # that's not true!
        self.assertEqual("success", task_instance_states["deliver_records.handle_failed_delivery"])

Put that in a file test_demo_trigger_one_failed.py next to the file with the DAG above, and run it with python -m unittest path/to/test_demo_trigger_one_failed.py.

Note,

task_instance_states at the end is shown to be:

task_instance_states: 
{'deliver_records.submit_job': 'success',
 'deliver_records.fake_sensor': 'failed',
 'deliver_records.deliver_record': 'upstream_failed',
 'deliver_records.handle_failed_delivery': 'success'
}

and the test passes, asserting that handle_failed_delivery succeeded. This is a misleading test output, because as we see above, in the real scheduler, handle_failed_delivery doesn't run at all! (Its ti.current_state() should be None not success.)

We have unit tests like this in our application, and were confident that our actual task with trigger_rule=ONE_FAILED would work correctly, and were very surprised when it broke in production.

In the process of fixing this with the "real" executors (SequentialExecutor in the simple demo above; KubernetesExecutor et al in a real production application), it would be great if DebugExecutor behaved at parity with the others, so users can rely on test coverage.

Thank you!

ephraimbuddy commented 1 year ago

@hussein-awala thanks for debugging, I was busy with releases the past 2 days, getting to looking at it now

ephraimbuddy commented 1 year ago

Ok. Found a fix for this but still need to reason more about it:

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 82282eb39d..31fc1bfa9b 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2859,7 +2859,7 @@ class TaskInstance(Base, LoggingMixin):
         # and "ti_count == ancestor_ti_count" does not work, since the further
         # expansion may be of length 1.
         if not _is_further_mapped_inside(upstream, common_ancestor):
-            return ancestor_map_index
+            return abs(ancestor_map_index)

         # Otherwise we need a partial aggregation for values from selected task
         # instances in the ancestor's expansion context.
ephraimbuddy commented 1 year ago

Update: This case is similar to the one I raised the PR just now: https://github.com/apache/airflow/pull/34138

Can reproduce this in test and working on it

ephraimbuddy commented 1 year ago

Ok. Found a fix for this but still need to reason more about it:

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 82282eb39d..31fc1bfa9b 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2859,7 +2859,7 @@ class TaskInstance(Base, LoggingMixin):
         # and "ti_count == ancestor_ti_count" does not work, since the further
         # expansion may be of length 1.
         if not _is_further_mapped_inside(upstream, common_ancestor):
-            return ancestor_map_index
+            return abs(ancestor_map_index)

         # Otherwise we need a partial aggregation for values from selected task
         # instances in the ancestor's expansion context.

Hi @uranusjr , can you chime in on this? I can't find a way to test this but this actually solved this case for mapped taskgroup with one failed. when we calculate ancestor_map_index, we multiply with self.map_index which can be -1. Are there reasons why we chose to not care if the map_index is negative or not?

uranusjr commented 1 year ago

The variable comes from here:

https://github.com/apache/airflow/blob/5744b424f3ddb3efdf5c2607d13e084955907cc8/airflow/models/taskinstance.py#L2855-L2856

So there are only two scenarios abs would make a difference:

  1. Either ancestor_ti_count or ti_count is negative
  2. self.map_index is negative

The first point seems unlikely since those both count something and should be a non-negative integer. Of course it’s possible we implemented something wrong, but in that case that bug should be fixed so the two values are always either zero or positive.

The second is more viable, and the only possible negative map_index value is -1, i.e. if the task instance is not expanded. This in turn is not right since this function should only be called on a mapped task instance in the first place. A task instance with map index of -1 should not appear here. And if it does, making -1 become 1 does not make sense as a fix.

benbuckman commented 10 months ago

Any updates on this?

ephraimbuddy commented 10 months ago

Any updates on this?

The PR is awaiting reviews

shahar1 commented 2 months ago

I'm reopening this issue as the original PR (#34337) that should have solved it was reverted, and no other fix has been merged since then. I'd like to take a chance to improve @ephraimbuddy 's fix so issues like https://github.com/apache/airflow/issues/35541 won't reoccur.

mis98zb commented 1 month ago

I tried the patch and found an issue. If the one_failed marked task in an expanded task group, one upstream task failure will cause all one_failed marked tasks being triggered.

for example:

task1 -> taskA1
task2 -> taskA2
task3 -> taskA3
task4 -> taskA4

task2's failure will trigger taskA1/2/3/4.