apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.95k stars 14.26k forks source link

Dynamic tasks fail when default_args contains custom fields #33600

Closed agomez-etsy closed 1 month ago

agomez-etsy commented 1 year ago

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

Airflow version: 2.6.3

Related to: https://github.com/apache/airflow/issues/29366

DAG

DEFAULT_TASK_ARGS = {
    "owner": "myteam",
    "start_date": "2023-08-18",
    "project_id": "myproject",
    "custom_arg": "foo",
}

@task
def foo() -> list[str]:
    return ["a", "b", "c"]

@task
def bar(val):
    logging.info(val)

with DAG(
    ...
    default_args=DEFAULT_TASK_ARGS,
) as dag:
    dynamic_tasks = bar.expand(val=foo())

    # Some more tasks that use `project_id` and `custom_arg`
    ...

If a DAG specifies default_args and the result of one of its tasks (foo) is used to expand another task (bar), expanded tasks fail at runtime with the following error:

[2023-08-21, 21:07:16 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1407, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1531, in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2179, in render_templates
    original_task.render_template_fields(context)
  ...
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 429, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 788, in __init__
    raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to _PythonDecoratedOperator (task_id: bar__1). Invalid arguments were:
**kwargs: {'project_id': "myproject", 'custom_arg': "foo"}

Tried to add kwargs to the list of arguments, but got the same result

@task
def bar(val, **kwargs):
    logging.info(val)

Finally, tried calling partial before expanding to test if this could fix it

dynamic_tasks = bar.partial().expand(val=foo())

What you think should happen instead

Mapped tasks ignore non-generic default task arguments and run tasks accordingly.

How to reproduce

Create a DAG similar to the one in the What happened section and run it.

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

apache-airflow-providers-airbyte==3.3.1 apache-airflow-providers-alibaba==2.4.1 apache-airflow-providers-amazon==8.2.0 apache-airflow-providers-apache-beam==5.1.1 apache-airflow-providers-apache-cassandra==3.2.1 apache-airflow-providers-apache-drill==2.4.1 apache-airflow-providers-apache-druid==3.4.1 apache-airflow-providers-apache-flink==1.1.1 apache-airflow-providers-apache-hdfs==4.1.0 apache-airflow-providers-apache-hive==6.1.1 apache-airflow-providers-apache-impala==1.1.1 apache-airflow-providers-apache-kylin==3.2.1 apache-airflow-providers-apache-livy==3.5.1 apache-airflow-providers-apache-pig==4.1.1 apache-airflow-providers-apache-pinot==4.1.1 apache-airflow-providers-apache-spark==4.1.1 apache-airflow-providers-apache-sqoop==3.2.1 apache-airflow-providers-arangodb==2.2.1 apache-airflow-providers-asana==2.2.1 apache-airflow-providers-atlassian-jira==2.1.1 apache-airflow-providers-celery==3.2.1 apache-airflow-providers-cloudant==3.2.1 apache-airflow-providers-cncf-kubernetes==7.4.1 apache-airflow-providers-common-sql==1.5.2 apache-airflow-providers-databricks==4.3.0 apache-airflow-providers-datadog==3.3.1 apache-airflow-providers-dbt-cloud==3.2.1 apache-airflow-providers-dingding==3.2.1 apache-airflow-providers-discord==3.3.0 apache-airflow-providers-docker==3.7.1 apache-airflow-providers-elasticsearch==4.5.1 apache-airflow-providers-exasol==4.2.1 apache-airflow-providers-facebook==3.2.1 apache-airflow-providers-ftp==3.4.2 apache-airflow-providers-github==2.3.1 apache-airflow-providers-google==10.5.0 apache-airflow-providers-grpc==3.2.1 apache-airflow-providers-hashicorp==3.4.1 apache-airflow-providers-http==4.4.2 apache-airflow-providers-imap==3.2.2 apache-airflow-providers-influxdb==2.2.1 apache-airflow-providers-jdbc==4.0.0 apache-airflow-providers-jenkins==3.3.1 apache-airflow-providers-microsoft-azure==6.1.2 apache-airflow-providers-microsoft-mssql==3.4.1 apache-airflow-providers-microsoft-psrp==2.3.1 apache-airflow-providers-microsoft-winrm==3.2.1 apache-airflow-providers-mongo==3.2.1 apache-airflow-providers-mysql==5.1.1 apache-airflow-providers-neo4j==3.3.1 apache-airflow-providers-odbc==4.0.0 apache-airflow-providers-openfaas==3.2.1 apache-airflow-providers-opsgenie==5.1.1 apache-airflow-providers-oracle==3.7.1 apache-airflow-providers-pagerduty==3.3.0 apache-airflow-providers-papermill==3.2.1 apache-airflow-providers-plexus==3.2.1 apache-airflow-providers-postgres==5.5.1 apache-airflow-providers-presto==5.1.1 apache-airflow-providers-qubole==3.4.1 apache-airflow-providers-redis==3.2.1 apache-airflow-providers-salesforce==5.4.1 apache-airflow-providers-samba==4.2.1 apache-airflow-providers-segment==3.2.1 apache-airflow-providers-sendgrid==3.2.1 apache-airflow-providers-sftp==4.3.1 apache-airflow-providers-singularity==3.2.1 apache-airflow-providers-slack==7.3.1 apache-airflow-providers-smtp==1.2.0 apache-airflow-providers-snowflake==4.2.0 apache-airflow-providers-sqlite==3.4.2 apache-airflow-providers-ssh==3.7.1 apache-airflow-providers-tableau==4.2.1 apache-airflow-providers-tabular==1.2.1 apache-airflow-providers-telegram==4.1.1 apache-airflow-providers-trino==5.1.1 apache-airflow-providers-vertica==3.4.1 apache-airflow-providers-zendesk==4.3.1

Deployment

Other 3rd-party Helm chart

Deployment details

Chart based on the official helm chart. Airflow running on Google Kubernetes Engine (GKE) using KubernetesExecutor.

Anything else

No response

Are you willing to submit PR?

Code of Conduct

eladkal commented 1 year ago

@agomez-etsy assigned to you waiting for your PR to address the issue

dadadima commented 9 months ago

Is there any known workaround to this issue?

I see that #29913 was merged to address this issue, am I wrong? If not, I'd love to contribute to a fix @agomez-etsy

agomez-etsy commented 8 months ago

hey @dadadima94. I don't think that change addressed the bug because it was merged on 2.6.0 and I tested it on 2.6.3. We're now using 2.7.3, so I can give it a try once again and will let you know how it goes

agomez-etsy commented 8 months ago

hey @dadadima94 I just tried it on 2.7.3 and I still see the same error. I was working on this in my free time, but I've been kinda busy lately, so feel free to pick it up!

hkc-8010 commented 8 months ago

Hey @agomez-etsy , How about defining custom_args as dag_level params in default_args and then use it in creating Dynamic tasks? Below is an example. I tested this on my local and its working.

from airflow.decorators import dag, task
from datetime import datetime
from airflow.operators.python import get_current_context
import logging

DEFAULT_TASK_ARGS = {
    "owner": "myteam",
    "params": {
        "project_id": "myproject",
        "custom_arg": "foo",
    }
}

@task
def foo() -> list[str]:
    return ["a", "b", "c"]

@task
def bar(val):
    context = get_current_context()
    project_id = context['params']['project_id']
    custom_arg = context['params']['custom_arg']
    logging.info(val)
    logging.info(project_id)
    logging.info(custom_arg)

@dag(default_args=DEFAULT_TASK_ARGS, schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, dag_id='params_example')
def params_example():
    dynamic_tasks = bar.expand(val=foo())
    dynamic_tasks
params_example_dag = params_example()
kyleburke-meq commented 3 months ago

@agomez-etsy Are there any updates here? this appears to still be an issue on 2.9

phi-friday commented 2 months ago

This fixes it, but I don't know why.

AIRFLOW__OPERATORS__ALLOW_ILLEGAL_ARGUMENTS=true

I think that if this is the cause, the error should occur in all BaseOperators, not just MappedOperator.

Of course, it's possible that this doesn't fix the cause of the problem, just the symptom.

phi-friday commented 2 months ago

Using this property _hook_apply_defaults seems to use the wrong default_args. No, it doesn't.

kaxil commented 2 months ago

@uranusjr / @Lee-W -- you guys might know

phi-friday commented 2 months ago

Most operators are passed only the parameters specified when defining the class using BaseOperatorMeta._apply_defaults(). However, _TaskDecorator bypasses that here and passes the entire default_args. (At this point, kwargs already has the entire default_args.) https://github.com/apache/airflow/blob/8def71479a8a4e943f8c39f07793c9fb2cf1fcc6/airflow/models/mappedoperator.py#L807-L818

This is where the entire default_args is passed. https://github.com/apache/airflow/blob/8def71479a8a4e943f8c39f07793c9fb2cf1fcc6/airflow/decorators/base.py#L434-L439 https://github.com/apache/airflow/blob/8def71479a8a4e943f8c39f07793c9fb2cf1fcc6/airflow/decorators/base.py#L484-L488

This is probably too complex an area for me to modify, so I'll leave it at that. I tried it, but I'm not sure.