apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.43k stars 14.11k forks source link

Using `MappedOperator` creates `Triggered DAG` incorrectly #41145

Open phi-friday opened 1 month ago

phi-friday commented 1 month ago

Apache Airflow version

2.9.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

When I create a TriggerDagRunOperator with partial and expand_kwargs, the Triggered DAG link button appears to be disabled, and when I click it, it leads to the wrong link.

incorrect url sample: https://localhost:8080/www/dags/trigger_test/grid?tab=details&dag_run_id=manual__2024-07-31T00%3A27%3A35.113684%2B00%3A00&task_id=trigger&map_index=0 expected url sample: https://localhost:8080/www/dags/trigger_target/graph?dag_run_id=manual__2024-07-31T00%3A44%3A45.582823%2B00%3A00-0

스크린샷 2024-07-31 오전 9 45 22 스크린샷 2024-07-31 오전 9 45 13

What you think should happen instead?

No response

How to reproduce

# pyright: reportTypedDictNotRequiredAccess=false
from __future__ import annotations

from typing import Any

from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from pendulum import datetime

@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def trigger_test() -> None:  # noqa: D103
    @task.python(do_xcom_push=True, multiple_outputs=False)
    def create_conf() -> list[dict[str, Any]]:
        from airflow.operators.python import get_current_context

        context = get_current_context()
        task_instance = context["task_instance"]
        run_id = task_instance.run_id
        return [
            {"conf": {"value": idx}, "trigger_run_id": f"{run_id}-{idx}"}
            for idx in range(2)
        ]

    conf = create_conf()
    trigger = TriggerDagRunOperator.partial(
        trigger_dag_id="trigger_target",
        task_id="trigger",
        deferrable=False,
        wait_for_completion=False,
    ).expand_kwargs(conf)
    other = TriggerDagRunOperator(
        task_id="other",
        trigger_dag_id="trigger_target",
        trigger_run_id="{{ ti.run_id }}-other",
        deferrable=False,
        wait_for_completion=False,
        conf={"value": 3},
    )

    _ = conf >> [trigger, other]

@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def trigger_target() -> None:  # noqa: D103
    EmptyOperator(task_id="empty_task")

trigger_test()
trigger_target()

Operating System

PRETTY_NAME="Debian GNU/Linux 12 (bookworm)" NAME="Debian GNU/Linux" VERSION_ID="12" VERSION="12 (bookworm)" VERSION_CODENAME=bookworm ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/"

Versions of Apache Airflow Providers

➜  airflow pip freeze | grep apache-airflow-providers
apache-airflow-providers-celery==3.7.2
apache-airflow-providers-common-io==1.3.2
apache-airflow-providers-common-sql==1.14.2
apache-airflow-providers-docker==3.12.2
apache-airflow-providers-fab==1.1.1
apache-airflow-providers-ftp==3.10.0
apache-airflow-providers-http==4.12.0
apache-airflow-providers-imap==3.6.1
apache-airflow-providers-jdbc==4.3.1
apache-airflow-providers-odbc==4.6.2
apache-airflow-providers-postgres==5.11.2
apache-airflow-providers-redis==3.7.1
apache-airflow-providers-smtp==1.7.1
apache-airflow-providers-sqlite==3.8.1

Deployment

Docker-Compose

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

fredthomsen commented 3 days ago

The request to the /extra_links view causes a crash with the follow traceback:

[2024-09-19T03:38:49.712+0000] {app.py:1744} ERROR - Exception on /extra_links [GET]
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 2529, in wsgi_app
    response = self.full_dispatch_request()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 1825, in full_dispatch_request
    rv = self.handle_user_exception(e)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 1823, in full_dispatch_request
    rv = self.dispatch_request()
         ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 1799, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/www/auth.py", line 226, in decorated
    return _has_access(
           ^^^^^^^^^^^^
  File "/opt/airflow/airflow/www/auth.py", line 139, in _has_access
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/session.py", line 97, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/www/views.py", line 3231, in extra_links
    url = task.get_extra_links(ti, link_name)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/abstractoperator.py", line 541, in get_extra_links
    return link.get_link(self.unmap(None), ti_key=ti.key)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/operators/trigger_dagrun.py", line 78, in get_link
    return build_airflow_url_with_query(query)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/helpers.py", line 242, in build_airflow_url_with_query
    return flask.url_for(f"Airflow.{view}", **query)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flask/helpers.py", line 256, in url_for
    return current_app.url_for(
           ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 2034, in url_for
    return self.handle_url_build_error(error, endpoint, values)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 2023, in url_for
    rv = url_adapter.build(  # type: ignore[union-attr]
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/werkzeug/routing/map.py", line 917, in build
    raise BuildError(endpoint, values, method, self)
werkzeug.routing.exceptions.BuildError: Could not build url for endpoint 'Airflow.grid' with values ['dag_run_id']. Did you forget to specify values ['dag_id']

Has some similarity to #32150.