apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.51k stars 14.14k forks source link

Skipping an Airflow task make the dynamic task skipped too even with TriggerRule.NONE_FAILED #34754

Closed Berenger-Wooclap closed 10 months ago

Berenger-Wooclap commented 12 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

We are using Apache Airflow version 2.5.3, and we have a DAG in which multiple tasks are executed sequentially, with the output of the first task serving as input for the subsequent tasks. All these tasks are set to TriggerRule.NONE_FAILED, meaning they should execute even if one of them fails.

However, we've encountered an issue where if the second task, which is dynamic, is 'skipped' (i.e., it does not run), the following task does not receive the XCom data from the first task. We have implemented a temporary workaround by preventing the second task from being 'skipped,' but we are looking for a permanent solution where the third task always receives XCom data from the first task, even if the second task is 'skipped.'

What you think should happen instead

I don't really know what is happening but i think the xcom is not propagate for any reason.

How to reproduce

Here is the part of the dag that create the tasks :


 tables_to_export = PythonOperator(
        task_id="show_configuration",
        python_callable=get_tables_to_export_from_params,
        on_failure_callback=slack_fail_alert,
        provide_context=True,
    )

    mongodb_exporter = WooclapKubernetesPodOperator.partial(
        config=config,
        image_pull_policy="Always",
        service_account_name=service_account_name,
        default_image_name=default_image_name,
        on_failure_callback=slack_fail_alert,
        name="mongodb-exporter-pod",
        task_id="mongodb-exporter-task",
        pool="mongo-db-pool-limit",
    ).expand(expand_args=tables_to_export.output)

    mongo_dump_to_pg = WooclapKubernetesPodOperator.partial(
        config=config,
        image_pull_policy="Always",
        service_account_name=service_account_name,
        default_image_name=default_image_name,
        on_failure_callback=slack_fail_alert,
        name="mongo-dump-to-pg-pod",
        task_id="mongo-dump-to-pg-task",
        pool="woostats-db-pool-limit",
    ).expand(expand_args=tables_to_export.output)

Operating System

Airflow 2.5.3 deploy on kubernetes

Versions of Apache Airflow Providers

Airflow 2.5.3

Deployment

Official Apache Airflow Helm Chart

Deployment details

The xcom is done with postgres db if i'm not wrong

Anything else

Details ![Capture d’écran 2023-10-04 à 14 05 24](https://github.com/apache/airflow/assets/134722848/2cc9c489-ed05-4331-b88a-b000c599c84c) ![Capture d’écran 2023-10-04 à 14 04 40](https://github.com/apache/airflow/assets/134722848/74ce880d-cc14-40b4-bd4e-934aba904b66) ![Capture d’écran 2023-10-04 à 14 04 56](https://github.com/apache/airflow/assets/134722848/5eaa6667-7b58-48cb-9355-fc8e16ab47dc)

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 12 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

Taragolis commented 11 months ago

I've unable to reproduce on current main, maybe it already fixed (I can't find related issue/PR)

However this one worked as expected


from airflow.decorators import task
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.utils.timezone import datetime
from airflow.exceptions import AirflowSkipException

@task
def make_list():
    return [4, 42, 2]

class Double(BaseOperator):
    def __init__(self, x: int, **kwargs):
        super().__init__(**kwargs)
        self.x = x

    def execute(self, context):
        if self.x == 42:
            raise AirflowSkipException("42")
        return self.x * 2

class Result(BaseOperator):
    def __init__(self, result: int, **kwargs):
        super().__init__(**kwargs)
        self.result = result

    def execute(self, context):
        print(self.result)

with DAG(
    "issue_34754",
    start_date=datetime(2023, 10, 1),
    schedule_interval="@daily",
    catchup=False,
    tags=["issue", "34754"]
) as dag:
    double = Double.partial(
        task_id="double-int"
    ).expand(x=make_list())

    Result.partial(
        task_id="print-result",
        trigger_rule="none_failed"
    ).expand(result=double.output)

image

github-actions[bot] commented 11 months ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] commented 10 months ago

This issue has been closed because it has not received response from the issue author.