apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.03k stars 14.28k forks source link

Dynamic mapped tasks group arguments are interpreted as MappedArgument when provided to classic operators #39222

Closed florian-guily closed 4 months ago

florian-guily commented 6 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.4

What happened?

When using expand_kwargs on a task group, arguments of this task group will not get correctly interpreted when using them in classic operators inside this task group. They will be interpreted as MappedArgument instead of their real values.

What you think should happen instead?

Real value of the mapped task group's argument should be passed to the operators.

How to reproduce

This was originally done with a google cloud operator in a task group, but i managed to reproduce it with a bash operator.

from airflow import DAG
from airflow.decorators import task, task_group
from airflow.operators.bash import BashOperator
from pendulum import datetime

dag = DAG(
     dag_id="airflow_issue_test",
     start_date=datetime(2024, 1, 1, tz='UTC'),
     catchup=False,
     schedule="@daily",
     default_args={"retries": 2},
 )

with dag:

    @task()
    def list_dict_generator():
        my_list = [
                {
                    "project": "my_project",
                    "dataset": f"dataset_{number}",
                    "table": "my_table",
                    "partition_id": "my_partition_id",
                }
                for number in range(10)
            ]
        return my_list

    @task_group()
    def my_tg(project, dataset, table_name, partition_id): 
        BashOperator(
            task_id="bash_task",
            bash_command=f"echo {project}.{dataset}.{table_name}${partition_id}",
            env={"MY_VAR": "Hello World"}
        )

    partitions_to_delete = list_dict_generator()

    my_tg.expand_kwargs(partitions_to_delete)

Here are the associated logs. You can clearly see that multiple MappedArguments are passed in the echo command, which should not happen. dag_id=airflow_issue_test_run_id=scheduled__2024-04-14T00_00_00+00_00_task_id=bq_to_gcs_tg.bash_task_map_index=0_attempt=2.log

Operating System

Mac OS Sonoma 14.2.1 (23C71)

Versions of Apache Airflow Providers

apache-airflow-providers-google==10.16.0 apache-airflow-providers-common-sql>=1.11.0

Deployment

Docker-Compose

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 6 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

RNHTTR commented 6 months ago

@florian-guily Would you like to be assigned to this issue?

florian-guily commented 6 months ago

@florian-guily Would you like to be assigned to this issue?

I'd like to yes, i'll find the time to resolve it !

renzo-sanchez-h commented 6 months ago

Hello there, I've just encountered the same issue. It doesn't only happen with classic operators, but anything in a task group scope, such as:

@task_group
def process_model_requests(model_name):
    config_results = prepare_pod_config(task_id=f"config_for_{model_name}")
    result_paths = prepare_pods(config_results, f"for_{model_name}")
    config_results >> result_paths

I have the version 2.6.3 though.

florian-guily commented 6 months ago

Hello there, I've just encountered the same issue. It doesn't only happen with classic operators, but anything in a task group scope, such as:

@task_group
def process_model_requests(model_name):
    config_results = prepare_pod_config(task_id=f"config_for_{model_name}")
    result_paths = prepare_pods(config_results, f"for_{model_name}")
    config_results >> result_paths

I have the version 2.6.3 though.

Interesting, not anything in a task group though, as python operator created with taskflow api seems to work well. Haven't tried with classic python operator.

renzo-sanchez-h commented 6 months ago

Interesting, not anything in a task group though, as python operator created with taskflow api seems to work well. Haven't tried with classic python operator.

Yes, sorry. I meant while using dynamic task mapping. Like the original post.

prefix = "process"
task_params = [{"task_id": "_".join([prefix, str(i + 1)]), "model_name": f"model_{i}"} for i in range(3)]
process_model_requests.expand_kwargs(task_params)
joshtree41 commented 5 months ago

Noticing this issue as well, tested a task group with task flow API and the arguments get picked up properly, but when passed to traditional operators they don't get picked up properly.

Locustv2 commented 5 months ago

More examples:

  1. https://github.com/apache/airflow/issues/31481
  2. https://github.com/apache/airflow/discussions/39927
d-callan commented 4 months ago

wondering if there is a known work-around in the mean time? or an update on this? @florian-guily ?

florian-guily commented 4 months ago

didn't find a proper workaround to use non-taskflow operators so i just used python operator for everything.

I'm investigating on the fix but i'm fairly new to open source contribution so i'm a bit slow, i need to find time for that also.

Locustv2 commented 4 months ago

The only workaround would be to use taskflow or not use mapped argument IMO

potiuk commented 4 months ago

Yes. And you can use simply Python subprocess.* calls to run bash script in such taskflow-decorated method, there is no particular need to use BashOperator for that if you can just run bash command yourself.

potiuk commented 4 months ago

Converting it to a discussion - as it is unlikely to be an "airflow issue"