apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.8k stars 14.23k forks source link

ExternalTaskMarker with user_defined_macros do not work in Airflow 2.0 or with store_serialized_dags = True in Airflow 1.10.* #13827

Closed yuqian90 closed 1 year ago

yuqian90 commented 3 years ago

When ExternalTaskMarker is used together with user_defined_macros, we get an unpleasant traceback when trying to clear tasks across DAGs.

This example (modified from example_dags/example_external_task_marker_dag.py) reproduces the issue. When user click on parent_task and hit Clear, they get the following traceback because the serialized dag obtained by the webserver has no user_defined_macros.

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator

def get_child():
    return "child_task1"

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensor

with DAG(
    "example_external_task_marker_parent",
    start_date=days_ago(2),
    user_defined_macros={"get_child": get_child},
) as parent_dag:
    parent_task = ExternalTaskMarker(
        task_id="parent_task",
        external_dag_id="example_external_task_marker_child",
        external_task_id="{{ get_child() }}",
    )

with DAG("example_external_task_marker_child", start_date=days_ago(2)) as child_dag:
    child_task1 = ExternalTaskSensor(
        task_id="child_task1",
        external_dag_id=parent_dag.dag_id,
        external_task_id=parent_task.task_id,
        mode="reschedule",
    )
    child_task2 = DummyOperator(task_id="child_task2")
    child_task1 >> child_task2
Traceback (most recent call last):
  File "/python/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/python/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/python/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/python/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/python/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/python/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/python/site-packages/airflow/www_rbac/decorators.py", line 121, in wrapper
    return f(self, *args, **kwargs)
  File "/python/site-packages/flask_appbuilder/security/decorators.py", line 109, in wraps
    return f(self, *args, **kwargs)
  File "/python/site-packages/airflow/www_rbac/decorators.py", line 56, in wrapper
    return f(*args, **kwargs)
  File "/python/site-packages/airflow/www_rbac/views.py", line 1332, in clear
    return self._clear_dag_tis(dag, start_date, end_date, origin,
  File "/python/site-packages/airflow/www_rbac/views.py", line 1278, in _clear_dag_tis
    tis = dag.clear(
  File "/python/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/python/site-packages/airflow/models/dag.py", line 1086, in clear
    ti.render_templates()
  File "/python/site-packages/airflow/models/taskinstance.py", line 1424, in render_templates
    self.task.render_template_fields(context)
  File "/python/site-packages/airflow/models/baseoperator.py", line 714, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/python/site-packages/airflow/models/baseoperator.py", line 721, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/python/site-packages/airflow/models/baseoperator.py", line 750, in render_template
    return jinja_env.from_string(content).render(**context)
  File "/python/site-packages/jinja2/asyncsupport.py", line 76, in render
    return original_render(self, *args, **kwargs)
  File "/python/site-packages/jinja2/environment.py", line 1008, in render
    return self.environment.handle_exception(exc_info, True)
  File "/python/site-packages/jinja2/environment.py", line 780, in handle_exception
    reraise(exc_type, exc_value, tb)
  File "/python/site-packages/jinja2/_compat.py", line 37, in reraise
    raise value.with_traceback(tb)
  File "<template>", line 1, in <module>
jinja2.exceptions.UndefinedError: 'get_child' is undefined

Apache Airflow version: Airflow 2.0 or 1.10.* with store_serialized_dags = True

Kubernetes version (if you are using kubernetes) (use kubectl version): NA

yuqian90 commented 3 years ago

Note that for any operator, if it uses macros in user_defined_macros in jinja template fileds and the user clicks on "Rendered Template" on the airflow page, they get an error message saying "Webserver does not have access to User-defined Macros or Filters". This is how the error is handled on the webserver for "Rendered Template". This is the exact same problem as the one reported in this issue. Both are because airflow Webserver do not have reference to the user_defined_macros of the DAG. If we have plans for making "Rendered Template" work for user_defined_macros, the same solution can probably be applied to this ExternalTaskMarker issue reported here.

Or if there's no plan to solve the "Rendered Template" issue, maybe I can put up a PR to handle the ExternalTaskMarker error in the same fashion (i.e. catch exception and flash an error message on the website when it happens).

@kaxil what do you think?

yuqian90 commented 3 years ago

Tried to discuss this on slack in #airflow-2-0. One possible workaround at the moment is to use AirflowPlugin instead of user_defined_macros. AirflowPlugin works fine in the webserver so jinja templates can be rendered just fine. The only downside is that macros are more cumbersome to use in jinja templates. E.g. with user_defined_macros, we could do {{ next_holiday(execution_date) }}. But with AirflowPlugin, we have to write {{ macros.my_plugin.next_holiday(execution_date) }}.

@kaxil suggested the following small change to dag.clear() which should make ExternalTaskMarker that have already run play well with dag.clear() because get_rendered_template_fields() reads from the db:

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 15332f334..30aae0083 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1260,7 +1260,7 @@ class DAG(LoggingMixin):
                                     max_recursion_depth, ExternalTaskMarker.__name__, ti.task_id
                                 )
                             )
-                        ti.render_templates()
+                        ti.get_rendered_template_fields()
                         external_tis = session.query(TI).filter(
                             TI.dag_id == task.external_dag_id,
                             TI.task_id == task.external_task_id,
kaxil commented 2 years ago

Hey @yuqian90 , did the proposed solution work for you locally? Do you think we should make a PR to fix?

(Apologies couldn't reply here earlier)

yuqian90 commented 2 years ago

Hey @yuqian90 , did the proposed solution work for you locally? Do you think we should make a PR to fix?

(Apologies couldn't reply here earlier)

Hi, @kaxil, yeah I can take another look and put up a PR.

eladkal commented 1 year ago

@yuqian90 is this issue still relevant?

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] commented 1 year ago

This issue has been closed because it has not received response from the issue author.