apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.06k stars 14.29k forks source link

default_args feature incompatible with Dynamic Task Mapping #29366

Open m1racoli opened 1 year ago

m1racoli commented 1 year ago

Apache Airflow version

2.5.1

What happened

We use default_args to configure common parameters across our Airflow deployments. Some of those parameters are specific to certain operators or a group of operators.

When we use dynamic task mapping in addition to our default_args the DAG fails to parse.

What you think should happen instead

The DAG should success to parse.

How to reproduce

The DAG

from typing import List

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

default_args = dict(
    start_date=days_ago(1),
    use_legacy_sql=False,
    something="else",
)

@task
def get_commands() -> List[str]:
    return [
        "echo hello",
        "echo world",
    ]

with DAG(
    "test_default_args",
    default_args=default_args,
    schedule=None,
) as dag:
    commands = get_commands()

    BashOperator.partial(
        task_id="run_bash",
    ).expand(
        bash_command=commands,
    )

will result in the following error

Broken DAG: [/usr/local/airflow/dags/test_dynamic_args.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 149, in __attrs_post_init__
    validate_mapping_kwargs(self.operator_class, "partial", self.kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 109, in validate_mapping_kwargs
    raise TypeError(f"{op.__name__}.{func}() got {error}")
TypeError: BashOperator.partial() got unexpected keyword arguments 'use_legacy_sql', 'something'

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==6.2.0
apache-airflow-providers-apache-hive==5.1.1
apache-airflow-providers-apache-livy==3.2.0
apache-airflow-providers-celery==3.1.0
apache-airflow-providers-cncf-kubernetes==5.1.1
apache-airflow-providers-common-sql==1.3.3
apache-airflow-providers-databricks==4.0.0
apache-airflow-providers-dbt-cloud==2.3.1
apache-airflow-providers-elasticsearch==4.3.3
apache-airflow-providers-ftp==3.3.0
apache-airflow-providers-google==8.8.0
apache-airflow-providers-http==4.1.1
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-microsoft-azure==5.1.0
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-redis==3.1.0
apache-airflow-providers-sftp==4.2.1
apache-airflow-providers-snowflake==4.0.2
apache-airflow-providers-sqlite==3.3.1
apache-airflow-providers-ssh==3.4.0

Deployment

Astronomer

Deployment details

We use Astronomer Runtime 7.2.0.

Anything else

No response

Are you willing to submit PR?

Code of Conduct

pankajastro commented 1 year ago

but both params use_legacy_sql and something does not look correct to me. How you are using these parameters? I believe it does not throw errors in case of non-dynamic tasks because https://github.com/apache/airflow/blob/main/airflow/models/baseoperator.py#L374 block i.e we copy only valid param from default to kwargs so invalid param get ignored.

But in the case of a dynamic task we remove valid from all param and if left with any param then throw an error https://github.com/apache/airflow/blob/main/airflow/models/mappedoperator.py#L92

m1racoli commented 1 year ago

The above parameters are just an example. The point is that we centrally define certain parameters which only apply to particular operators and are ignored for other operators. This works with normal operators, but not with mapped operators.

pankajastro commented 1 year ago

hmm, I agree that it is not consistent but when you pass a kwargs using default_args then that kwargs is applied to all operators of a DAG off course you override the value of these params. what should be the right behaviour here, should it fail for normal operators as well or should work for mapped operators? IMO it should fail for the normal operators as well.

m1racoli commented 1 year ago

IMO it should fail for the normal operators as well.

That would limit the use case for default_args dramatically. I kinda expected the current behaviour for normal operators to be part of the feature set of default_args.

bryzgaloff commented 1 year ago

Hi @m1racoli thank you for reporting an issue :+1: I have encountered the same limitation which was surprising to me. I agree that default_args should keep working for dynamic tasks mapping and do it the same way as for regular operators: applied only when needed for a particular operator based on its signature. Probably, default_args are simply copied into expand's too broad **mapped_kwargs signature which causes an issue.

As a quick potential workaround, you may wrap a mapped operator into a TaskGroup with default_args={}, because here is what docs say:

TaskGroup also supports default_args like DAG, it will overwrite the default_args in DAG level

I have not tested myself but accidentally noticed this in the documentation, so if you have a chance to try it out, please let me know if it works.

potiuk commented 1 year ago

@uranusjr - do you think it would be possible/easy to add default_args to partial invocation automatically, sounds like a possibility, my only worry is whether expand would override them if they are also specified there ?

m1racoli commented 1 year ago

my only worry is whether expand would override them if they are also specified there

Overriding params in expand which are defined via default_args would be intended behaviour and in line with the behaviour of normal operators, wouldn't be?

potiuk commented 1 year ago

Overriding params in expand which are defined via default_args would be intended behaviour and in line with the behaviour of normal operators, wouldn't be?

Yes. By "worry" I mean, whether there are no problems somewhere deep the stack that would make it difficult or have side efffects that we do not see without deep knowledge of how dynamic task mapping is done. It's easy to say "things should be done this way" and much more difficult to do it.

BTW. @m1racoli -> maybe you would like to implement a POC for that one to dispell the worries? That would be good opportunity to learn the deep internals and contribute back to the project.

uranusjr commented 1 year ago

Intuitively it feels possible, except default_args is much more volatile to get right since it deals with unspecified arguments from indefinite operator classes. A poc would make discussion much easier.

subuserzero commented 1 year ago

As a quick potential workaround, you may wrap a mapped operator into a TaskGroup with default_args={}, because here is what docs say:

TaskGroup also supports default_args like DAG, it will overwrite the default_args in DAG level

I have not tested myself but accidentally noticed this in the documentation, so if you have a chance to try it out, please let me know if it works.

That did not seem to work for me, but replacing the default_args of the DAG inside the task-group definition with an empty dict seems to get past the TypeError. However, that seems to kill the default_args for downstream dependencies, so a bit of sleight-of-hand is needed:

@task_group(group_id='foo_group', dag=<dag-ref>)
def foo_group():
    def_args = dag.default_args.copy()
    dag.default_args = {}
    ....
    dag.default_args = def_args
vchiapaikeo commented 1 year ago

I also recently ran into this. Just wanted to share that the issue ends up popping up as a dag parsing error on the UI:

Broken DAG: [/opt/airflow/dags/dataeng/batch/test_dynamic_spark.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 149, in __attrs_post_init__
    validate_mapping_kwargs(self.operator_class, "partial", self.kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 109, in validate_mapping_kwargs
    raise TypeError(f"{op.__name__}.{func}() got {error}")
TypeError: BashOperator.partial() got unexpected keyword arguments 'project_id', 'gcp_conn_id', 'google_cloud_conn_id', 'impersonation_chain', 'auto_page_management', 'alert_in_test', 'abcdef'
image

I'm actively looking for workarounds 👀

vchiapaikeo commented 1 year ago

@uranusjr , what do you think about something like this to remove invalid keys from OperatorPartial's self.kwargs?

https://github.com/apache/airflow/pull/30056

I'm using the below DAG to test and it's working as expected - valid args like env are applied to BashOperator and invalid ones like abcdef are deleted from self.kwargs.

from airflow import DAG

from airflow.decorators import task
from airflow.operators.bash import BashOperator

DEFAULT_TASK_ARGS = {
    "owner": "gcp-data-platform",
    "start_date": "2023-03-11",
    "retries": 0,
    "abcdef": 12312,
    "env": {
        "HELLO": "WORLD",
    },
}

@task
def make_list():
    # This can also be from an API call, checking a database, -- almost anything you like, as long as the
    # resulting list/dictionary can be stored in the current XCom backend.
    return [
        "echo 1 ${HELLO}",
        "echo 2 ${HELLO}",
        "echo 3 ${HELLO}",
    ]

with DAG(
    schedule_interval="@daily",
    max_active_runs=1,
    max_active_tasks=10,
    catchup=False,
    dag_id="test_expand_with_default_args",
    default_args=DEFAULT_TASK_ARGS,
) as dag:
    xcom_output = make_list.override(do_xcom_push=True)()

    t = BashOperator.partial(task_id="bash_op").expand(bash_command=xcom_output)

Granted, this does make the call to validate_mapping_kwargs in __attrs_post_init__ obsolete. Do we really need this validation step if we are guaranteed to only be passing valid kwargs through now?

Successful DAG:

image

Mapped Task (1):

image

If this looks good to you, I can work on unit tests here.

uranusjr commented 1 year ago

There is already #29913 so you’ll need to take it into account.

potiuk commented 10 months ago

As everything in Open Source - if you care about a problem the best way to speed up a solution is to create and lead to completion PR fixing it. This is what I heartily recommend. The second best is to encourage people who create such PR and volunteer to help to test in-progress solutions.- apply patches, review the code and generally thank people who do the job and to help them to complete it. The third best is to pay someone to do the fix - if you have no time or skills to do either of the two.

This is generally how things work with open source where you get software for free and without any guarantees, having an ETA expectation when you do not contribute is kind of strange if you ask me.

But you have all the ways above to contribute I described above @asafsneh . Which one will you choose to speed up resolution of the problem I wonder?

asafsneh commented 10 months ago

Hi @potiuk , thank you for the helpful recommendation and explanation. I apologize if my previous question may have come across as rude or entitled. It clearly slipped my mind that the people behind this open-source project aren't working specifically for me or others and indeed, expecting an ETA isn't quite fair or respectful.

I appreciate your understanding and the community's effort in maintaining the project. To express my gratitude and commitment, I'll certainly try my best to contribute and solve the problem by working on the code myself. I hope my skills are strong enough. Thanks again for your time and continuous efforts!

harshavardhan commented 8 months ago

I am encountering the same issue. Wondering if there are any known workarounds.

eladkal commented 5 months ago

I also recently ran into this. Just wanted to share that the issue ends up popping up as a dag parsing error on the UI:

Broken DAG: [/opt/airflow/dags/dataeng/batch/test_dynamic_spark.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 149, in __attrs_post_init__
    validate_mapping_kwargs(self.operator_class, "partial", self.kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 109, in validate_mapping_kwargs
    raise TypeError(f"{op.__name__}.{func}() got {error}")
TypeError: BashOperator.partial() got unexpected keyword arguments 'project_id', 'gcp_conn_id', 'google_cloud_conn_id', 'impersonation_chain', 'auto_page_management', 'alert_in_test', 'abcdef'
image

I'm actively looking for workarounds 👀

@vchiapaikeo were you able to come up with a fix? Havppy to review it if so