apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.05k stars 14.29k forks source link

Dynamic Task Mapping skips tasks before upstream has started #28973

Closed inytar closed 1 year ago

inytar commented 1 year ago

Apache Airflow version

2.5.0

What happened

In some cases we are seeing dynamic mapped task being skipped before upstream tasks have started & the dynamic count for the task can be calculated. We see this both locally in a with the LocalExecutor & on our cluster with the KubernetesExecutor.

To trigger the issue we need multiple dynamic tasks merging into a upstream task, see the images below for example. If there is no merging the tasks run as expected. The tasks also need to not know the number of dynamic tasks that will be created on DAG start, for example by chaining in an other dynamic task output.

screenshot_2023-01-16_at_14-57-23_test_skip_-_graph_-_airflow screenshot_2023-01-16_at_14-56-44_test_skip_-_graph_-_airflow

If the DAG, task, or upstream tasks are cleared the skipped task runs as expected.

The issue exists both on airflow 2.4.x & 2.5.0.

Happy to help debug this further & answer any questions!

What you think should happen instead

The tasks should run after upstream tasks are done.

How to reproduce

The following code is able to reproduce the issue on our side:

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.utils.task_group import TaskGroup
from airflow.operators.empty import EmptyOperator

# Only one chained tasks results in only 1 of the `skipped_tasks` skipping.
# Add in extra tasks results in both `skipped_tasks` skipping, but
# no earlier tasks are ever skipped.
CHAIN_TASKS = 1

@task()
def add(x, y):
    return x, y

with DAG(
    dag_id="test_skip",
    schedule=None,
    start_date=datetime(2023, 1, 13),
) as dag:

    init = EmptyOperator(task_id="init_task")
    final = EmptyOperator(task_id="final")

    for i in range(2):
        with TaskGroup(f"task_group_{i}") as tg:
            chain_task = [i]
            for j in range(CHAIN_TASKS):
                chain_task = add.partial(x=j).expand(y=chain_task)
            skipped_task = (
                add.override(task_id="skipped").partial(x=i).expand(y=chain_task)
            )

        # Task isn't skipped if final (merging task) is removed.
        init >> tg >> final

Operating System

MacOS

Versions of Apache Airflow Providers

This can be reproduced without any extra providers installed.

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template!

uranusjr commented 1 year ago

Can you try if #28592 fixed this?

ephraimbuddy commented 1 year ago

Working for me in the current main but the graph is not displaying(cc @bbovenzi ).

bbovenzi commented 1 year ago

Working for me in the current main but the graph is not displaying(cc @bbovenzi ).

Did it not load at all? Or just incorrectly? I fixed one graph issue yesterday: https://github.com/apache/airflow/pull/28993

inytar commented 1 year ago

I tried this both in main and #28592 but still see the same issue. Also as @ephraimbuddy I don't see the graph either, but I guess that's an other issue.

image

ephraimbuddy commented 1 year ago

@inytar , it works for me in the current main. No skipping. What are your other settings like?

@bbovenzi , it doesn't come up on the graph view in the current main

Screenshot 2023-01-19 at 14 11 21

bbovenzi commented 1 year ago

Oh I see. I have a fix for the UI part here: https://github.com/apache/airflow/pull/29042

inytar commented 1 year ago

@inytar , it works for me in the current main. No skipping. What are your other settings like?

I've seen the issue on different settings/environments:

It seems to me like some kind of race condition, could you try upping the number of CHAIN_TASKS? If I set this to 1 I only see one task being skipped, if I set it to 2 I see both task being skipped.

I've updated the code a bit to add a workaround. This just adds two empty tasks before the merge, when this is done the tasks aren't skipped anymore (also useful for anyone else that might hit this):

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.utils.task_group import TaskGroup
from airflow.operators.empty import EmptyOperator

# Only one chained tasks results in only 1 of the `skipped_tasks` skipping.
# Add in extra tasks results in both `skipped_tasks` skipping, but
# no earlier tasks are ever skipped.
CHAIN_TASKS = 2

# Add workaround
WORKAROUND = False

@task()
def add(x, y):
    return x, y

with DAG(
    dag_id="test_skip",
    schedule=None,
    start_date=datetime(2023, 1, 13),
) as dag:

    init = EmptyOperator(task_id="init_task")
    final = EmptyOperator(task_id="final")

    for i in range(2):
        with TaskGroup(f"task_group_{i}") as tg:
            chain_task = [i]
            for j in range(CHAIN_TASKS):
                chain_task = (
                    add.override(task_id=f"add_{j}").partial(x=j).expand(y=chain_task)
                )
            skipped_task = (
                add.override(task_id="skipped").partial(x=i).expand(y=chain_task)
            )

        init >> tg

        # Workaround: Adding an empty normal task before the merge step fixes the issue.
        if WORKAROUND:
            workaround = EmptyOperator(task_id=f"workaround_{i}")
            tg >> workaround
            next = workaround
        else:
            next = tg

        # Task isn't skipped if final (merging task) is removed.
        next >> final

I'm OoO till Monday Jan 30th, so I won't be able to help much till then. When I'm back I'm happy to test, ect!

darkfennertrader commented 1 year ago

Hello, just to let you know that the problem still exists with multiple dynamic tasks in cascade. I solved the issue using an empty operator.

Airflow 2.5.2 (using the official docker-compose file) Celery Executor OS: Ubuntu 22.04 python version: 3.10.6

airflow_bug

import os
from typing import Dict, Any
from datetime import datetime
import logging
import pendulum
from airflow.decorators import dag, task  # type: ignore
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.empty import EmptyOperator
from dags_conf import default_args
from custom_operators.csv_to_postgres import CsvToPostgres  # type: ignore

local_tz = pendulum.timezone("Europe/Rome")  # type: ignore
task_logger = logging.getLogger("airflow.task")
airflow_home: Any = os.getenv("AIRFLOW_HOME")
BASE_DIR: str = airflow_home + "/data_sources/ibkr"
provider_name: Any = os.getenv("ibkr_provider_name")
frequency: Any = os.getenv("ibkr_frequency")

@dag(
    dag_id="ibkr_db_init_v_1_1_0",
    start_date=datetime(2023, 1, 1, tzinfo=local_tz),
    schedule=None,
    catchup=False,
    tags=["db init"],
    default_args=default_args,
)
def populate_db() -> None:
    @task(do_xcom_push=True)  # type: ignore
    def get_assets_years_and_files() -> list[Dict[str, str | list[str]]]:

        # (1) create list of assets
        assets = list(set(os.listdir(BASE_DIR)))

        # (2) create list of assets and relative directories
        assets_years = list({})
        for asset in assets:
            years: list[str] = list(set(os.listdir(BASE_DIR + "/" + asset)))
            years = [
                y for y in years if os.path.isdir(BASE_DIR + "/" + asset + "/" + y)
            ]

            assets_years.append({"symbol": asset, "years": years})

        return assets_years

    @task  # type: ignore
    def deduplication(symbol: str, years: list[str]) -> None:

        import pandas as pd

        overall_file_path = BASE_DIR + "/" + symbol + "/"

        # print(file_list)
        dframe = pd.DataFrame(
            columns=["Datetime", "Open", "High", "Low", "Close", "Volume"]
        )
        for year in years:
            base_path = BASE_DIR + "/" + symbol + "/" + year
            # list all files within a directory
            data_path: list[str] = [
                f
                for f in os.listdir(base_path)
                if os.path.isfile(os.path.join(base_path, f))
            ]

            # remove overall file if present for idempotency
            if os.path.isfile(os.path.join(overall_file_path, "final.csv")):
                os.remove(os.path.join(overall_file_path, "final.csv"))

            print(symbol)

            for file in data_path:
                filepath = base_path = BASE_DIR + "/" + symbol + "/" + year + "/" + file
                # print(filepath)
                data = pd.read_csv(filepath, parse_dates=["Datetime"], date_parser=lambda x: pd.to_datetime(x).tz_localize("UTC"))  # type: ignore
                dframe = pd.concat([dframe, data])  # type: ignore

        # renaming columns to make them compatible with db table columns
        dframe.rename(
            columns={
                "Datetime": "time",
                "Open": "open",
                "High": "high",
                "Low": "low",
                "Close": "close",
                "Volume": "volume",
            },
            inplace=True,
        )

        dframe.set_index("time", drop=True, inplace=True)  # type: ignore
        # deduplication
        dframe = dframe[~dframe.index.duplicated(keep="first")]  # type: ignore
        dframe.sort_index(inplace=True)  # type: ignore
        dframe.to_csv(overall_file_path + "/final.csv")
        print(dframe.shape)

    def list_of_dicts(elem: Dict[str, Any]) -> Dict[str, str]:
        return {
            "symbol": elem["symbol"].replace("-", "/"),
            "provider_name": provider_name,
            "frequency": frequency,
        }

    assets_years = get_assets_years_and_files()
    pg_input: list[dict(str, str)] = assets_years.map(list_of_dicts)  # type: ignore
    deduplicate = deduplication.partial().expand_kwargs(assets_years)  # type: ignore

    complementary_info = PostgresOperator.partial(
        task_id="complementary_info",
        postgres_conn_id="postgres_conn",  # created as env variable
        sql="sql/GET_INFO_FROM_SYMBOLS_PROVIDERS.sql",
    ).expand(  # type: ignore
        parameters=pg_input
    )

    def list_of_str_int(elem: list[list[str | int]]) -> list[str | int]:
        return [y for x in elem for y in x]

    task_input = complementary_info.output.map(list_of_str_int)

    # save complementary info in csv files for postgres IBKR table compatibility
    @task(trigger_rule="all_success", depends_on_past=False, wait_for_downstream=False)  # type: ignore
    def enrich_csv(extra_info: list[Any]) -> None:
        import pandas as pd

        symbol, symbol_id, provider_id, asset_class_id, frequency_id = (
            extra_info[0],
            extra_info[1],
            extra_info[2],
            extra_info[3],
            extra_info[4],
        )
        print(symbol)
        filepath = BASE_DIR + "/" + symbol.replace("/", "-") + "/final.csv"
        dframe = pd.read_csv(filepath, parse_dates=["time"], index_col="time")  # type: ignore
        print(f"before: {dframe.shape}")
        dframe["provider_id"] = provider_id
        dframe["asset_class_id"] = asset_class_id
        dframe["frequency_id"] = frequency_id
        dframe["symbol_id"] = symbol_id
        print(f"after: {dframe.shape}")

        dframe.to_csv(filepath, header=True)

        print(extra_info)

    enrich = enrich_csv.partial().expand(extra_info=task_input)  # type: ignore

    @task  # type: ignore
    def prepare_input() -> list[str]:
        assets = list(set(os.listdir(BASE_DIR)))
        filepaths: list[str] = []
        for elem in assets:
            filepath = "data_sources/ibkr/" + elem + "/final.csv"
            filepaths.append(filepath)

        return filepaths

    csvpg_input = prepare_input()

    solve_bug = EmptyOperator(task_id="solve_bug")

    # save csv to Postgres database
    kwargs: dict[str, Any] = {
        "task_id": "save_data_to_db",
    }

    # filepath = "data_sources/ibkr/AUD-CAD/final.csv"
    # save_to_db = CsvToPostgres(
    #     filepath=filepath, sql="sql/COPY_INTO_IBKR_DATA.sql", **kwargs
    # )

    save_to_db = CsvToPostgres.partial(  # type: ignore
        sql="sql/COPY_INTO_IBKR_DATA.sql", **kwargs
    ).expand(  # type: ignore
        filepath=csvpg_input
    )
    [deduplicate, complementary_info] >> enrich >> solve_bug >> save_to_db  # type: ignore

populate_db()
jose-workpath commented 1 year ago

I also have this error on Airflow 2.5.3.

I found out that it happens when 2 dynamically mapped tasks have a dependency with each other and the child task of the last mapped task has another parent task. ie in a DAG like this:

graph LR
    A["A (mapped task)"] --> B["B (mapped task)"] --> D
    C --> D

The mapped task B will always get skipped, and of course any child task of it.

In my case, I can not do what @darkfennertrader suggested because I use the result of the first mapped task as a parameter of the next mapped task.

I created a simple DAG to reproduce the error I'm getting:

"""
Some bug causes a DAG like this to always get skipped
when there are more than 1 expanded task and the downstream
of the last expanded task has another upstream task. ie:

`A (mapped) -> B (mapped) -> D`

and:

`C -> D`

Will immediately cause: `B` and `D` and any downstream of `D` to be Skipped.

In this DAG example you will see how the tasks `make_it_double` (B), `wrap_up (D)` and
any downstream immediately returns a 'skipped' status as soon as the DAG is triggered
and are not scheduled for execution.
"""

import datetime
from typing import List

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.decorators import task

@task
def generate_list_of_numbers() -> List[List[int]]:
    """
    Retrieves a list of length 2, each sublist containing lists of integers of range [1, 3]
    """
    return [list(range(3)) for _ in range(2)]

@task
def a_add_them_together(numbers: List[int]):
    """
    Add the numbers together
    """
    return sum(numbers)

@task
def b_make_it_double(summed: int):
    """
    Multiplies by 2
    """
    return summed * 2

@task
def d_wrap_up(results) -> int:
    """
    Prints the results
    """
    print(results)

@task
def c_some_upstream():
    print("Can be any kind of upstream python task.")

with DAG(
    dag_id="reproduce_skipped_expansions_error",
    schedule_interval=None,
    catchup=False,
    start_date=datetime.datetime(2023, 1, 13),
    dagrun_timeout=datetime.timedelta(minutes=60),
    doc_md=__doc__,
) as dag:

    numbers = generate_list_of_numbers()
    # Expected result: [[1,2,3], [1,2,3]]
    a_added_numbers = a_add_them_together.expand(numbers=numbers)
    # Expected result: [6, 6]
    b_multiplied = b_make_it_double.expand(summed=a_added_numbers)
    # Expected result: [12, 12]
    c_dep = c_some_upstream()
    # Just prints 'multiplied':
    d_wrap = d_wrap_up(results=b_multiplied)
    # Define order of tasks:
    c_dep >> d_wrap
    d_wrap >> EmptyOperator(task_id="any_downstream")

if __name__ == "__main__":
    dag.cli()
uranusjr commented 1 year ago

Thanks for the investigation and the reproduction, that’s great help. I’ll find some time to check if I can find the issue later this week.

uranusjr commented 1 year ago

First finding: d_wrap is skipped because b_make_it_double is not expanded, and the unexpanded task is marked as skipped. Next I need to find out why b_make_it_double is marked as skipped. This happens surprisingly early in scheduling, even before a_add_them_together is actually run!? And more weird I can yet find what causes b_make_it_double to be skipped (no relevant logs).

uranusjr commented 1 year ago

Update: My attempt to induce the issue with an extremely simplified scheduler (calling task_instance_scheduling_decisions repeatedly) does not work; things all run as expected. This leads me to guess the issue may be in the mini scheduler, not the main scheduler loop.

uranusjr commented 1 year ago

Yup, issue’s gone if I set schedule_after_task_execution = false. This should greatly narrow down the possible causes.