astronomer / dag-factory

Dynamically generate Apache Airflow DAGs from YAML configuration files
Apache License 2.0
1.21k stars 183 forks source link

Add support for templating `on_failure_callback` #252

Closed jroach-astronomer closed 4 weeks ago

jroach-astronomer commented 1 month ago

Add support for templating in on_failure_callback. Requester is specifically looking to support templated parameters that do not need to be stored in a .py file.

Here's an example (from the issue submitted by @matveykortsev) for reference:

from airflow.providers.slack.notifications.slack import send_slack_notification
'on_failure_callback': [
        send_slack_notification(
            slack_conn_id='slack',
            text="""
                :red_circle: Task Failed. 
                *Task*: {{ ti.task_id }}  
                *Dag*: {{ ti.dag_id }} 
                *Execution Time*: {{ ti.execution_date }}  
                *Log Url*: {{ ti.log_url }} 
                """,
            channel="analytics-alerts",
            username="Airflow",
        )
    ],

Closes: #209

codecov-commenter commented 1 month ago

Codecov Report

Attention: Patch coverage is 95.23810% with 1 line in your changes missing coverage. Please review.

Project coverage is 93.95%. Comparing base (1c999f5) to head (7088269). Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
dagfactory/dagbuilder.py 95.23% 1 Missing :warning:
Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #252 +/- ## ========================================== - Coverage 93.96% 93.95% -0.02% ========================================== Files 8 8 Lines 630 645 +15 ========================================== + Hits 592 606 +14 - Misses 38 39 +1 ```

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

jroach-astronomer commented 1 month ago

@tatiana, @pankajastro - should be all set to merge. Thanks!

pankajastro commented 1 month ago

@jroach-astronomer and I connected to discuss this PR, and we decided to add an example in the README.md for this feature. We will include this in the 0.2.0 release.

jroach-astronomer commented 1 month ago

Updated to include documentation in the README.md, as well as support for callbacks defined at default_args level. Also integrates a recent change to allow for callbacks defined using a file and name.

pankajastro commented 1 month ago

Hey @jroach-astronomer, I'm wondering if you were able to test it. I tried to test this for Slack but got the below error

Broken DAG: [/usr/local/airflow/dags/example_dag_factory.py]
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/module_loading.py", line 42, in import_string
    return getattr(module, class_name)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: module 'airflow.providers.slack.notifications' has no attribute 'slack import send_slack_notification'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/astro/.local/lib/python3.12/site-packages/dagfactory/dagbuilder.py", line 842, in set_callback
    on_state_callback_callable: Callable = import_string(on_state_callback_params["callback"])
                                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/module_loading.py", line 44, in import_string
    raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute/class')
ImportError: Module "airflow.providers.slack.notifications" does not define a "slack import send_slack_notification" attribute/class

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/astro/.local/lib/python3.12/site-packages/dagfactory/dagfactory.py", line 158, in clean_dags
    dags: Dict[str, Any] = self.build_dags()
                           ^^^^^^^^^^^^^^^^^
  File "/home/astro/.local/lib/python3.12/site-packages/dagfactory/dagfactory.py", line 112, in build_dags
    raise DagFactoryException(f"Failed to generate dag {dag_name}. verify config is correct") from err
dagfactory.exceptions.DagFactoryException: Failed to generate dag example_dag. verify config is correct

YAML

default:
  default_args:
    owner: "default_owner"
    start_date: 2018-03-01
    end_date: 2018-03-05
    retries: 1
    retry_delay_sec: 300
    #on_success_callback_name: print_hello_from_callback
    #on_success_callback_file: $CONFIG_ROOT_DIR/print_hello.py
    on_failure_callback:
      callback: airflow.providers.slack.notifications.slack import send_slack_notification
      text: |
          :red_circle: Task Failed.
          This task has failed and needs to be addressed.
          Please remediate this issue ASAP.
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 600
  default_view: "tree"
  orientation: "LR"
  schedule_interval: "0 1 * * *"
  on_failure_callback_name: print_hello_from_callback
  on_failure_callback_file: $CONFIG_ROOT_DIR/print_hello.py

example_dag:
  default_args:
    owner: "custom_owner"
    start_date: 2 days
  description: "this is an example dag"
  schedule_interval: "0 3 * * *"
  render_template_as_native_obj: True
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 1"
    task_2:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 2"
      dependencies: [task_1]
    task_3:
      operator: airflow.operators.python_operator.PythonOperator
      python_callable_name: print_hello
      python_callable_file: $CONFIG_ROOT_DIR/print_hello.py
      dependencies: [task_1]

Based on the example here: https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/notifications/slackwebhook_notifier_howto_guide.html look like we may need to handle the notification import a bit differently or I'm doing something wrong it testing?

jroach-astronomer commented 1 month ago

@pankajastro, good catch, there is a typo in my example. It should be 'airflow.providers.slack.notifications.slack.send_slack_notification' for that callback.

I will change that and push a new commit.

jroach-astronomer commented 1 month ago

@pankajastro, good catch, there is a typo in my example. It should be 'airflow.providers.slack.notifications.slack.send_slack_notification' for that callback.

I will change that and push a new commit.

Update pushed!

pankajastro commented 1 month ago

I am getting the below error. Do you have yml which I can use for testing

[2024-10-21, 20:42:10 UTC] {taskinstance.py:3311} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/dags/customized/operators/breakfast_operators.py", line 13, in execute
    raise Exception("error")
Exception: error
[2024-10-21, 20:42:10 UTC] {taskinstance.py:1225} INFO - Marking task as FAILED. dag_id=example_breadfast, task_id=make_bread_1, run_id=manual__2024-10-21T20:42:06.309471+00:00, execution_date=20241021T204206, start_date=20241021T204210, end_date=20241021T204210
[2024-10-21, 20:42:10 UTC] {taskinstance.py:1563} INFO - Executing callback at index 0: partial
[2024-10-21, 20:42:10 UTC] {taskinstance.py:1567} ERROR - Error in callback at index 0: partial
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 273, in _run_raw_task
    TaskInstance._execute_task_with_callbacks(
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3159, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3183, in _execute_task
    return _execute_task(self, context, task_orig)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/dags/customized/operators/breakfast_operators.py", line 13, in execute
    raise Exception("error")
Exception: error
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 1565, in _run_finished_callback
    callback(context)
TypeError: SlackNotifier.__init__() takes 1 positional argument but 2 positional arguments (and 4 keyword-only arguments) were given
tatiana commented 1 month ago

We are moving this to the 0.21 milestone since @jroach-astronomer is on leave until after the next release. We can have pre-releases with this change as soon as the issue pointed out by @pankajastro is solved.

jroach-astronomer commented 1 month ago

@pankajastro, @tatiana, I've updated the dev/dags/ directory to include the example_callbacks.py file. However, I'm running into an issue when it comes to adding example_callbacks.py to the .airflowignore for the integration tests. It seems to be exactly what was mentioned in this issue (https://github.com/apache/airflow/issues/23532), as it's only happening for Airflow Version == 2.3. Any thoughts here?

jroach-astronomer commented 1 month ago

@pankajastro, I've gone ahead and testing the changes using the Astro project in the dev/ directory. Here was the issue! I did not include a slack_conn_id in the on_failure_callback definition (hence the parameter exception).

Those 3 failing integration tests are due to the known symlink issue in Airflow 2.3.0. We'll need to resolve that in order for the integration tests to run.

I'm also going to complete adding this callback functionality to all callbacks (at the DAG and Task-level, include success, retry, etc.).

jroach-astronomer commented 1 month ago

@pankajastro, @tatiana, wanted to check in on integration tests. Please see the comment above. Thanks!

pankajastro commented 1 month ago

Those 3 failing integration tests are due to the known symlink issue in Airflow 2.3.0. We'll need to resolve that in order for the integration tests to run.

I think we can skip this particular test for Airflow 2.3.0 you may have to adjust a bit https://github.com/astronomer/dag-factory/blob/main/tests/test_example_dags.py

pankajastro commented 1 month ago

Which Slack channel are you using to test this? We may need to configure it in CI. Has it already been configured?

pankajastro commented 1 month ago

I'm able to test below YML DAG with slack webhook notification

example_callbacks:
  default_args:
    start_date: "2024-01-01"
    on_failure_callback:
      callback: airflow.providers.slack.notifications.slack_webhook.send_slack_webhook_notification
      slack_webhook_conn_id: slack_conn_id
      text: "Hello World!"
  schedule_interval: "@daily"
  catchup: False
  tasks:
    start:
      operator: airflow.operators.python.PythonOperator
      python_callable_file: $CONFIG_ROOT_DIR/customized/callables/python.py
      python_callable_name: succeeding_task
    end:
      operator: airflow.operators.python.PythonOperator
      python_callable_file: $CONFIG_ROOT_DIR/customized/callables/python.py
      python_callable_name: failing_task
      dependencies:
        - start
matveykortsev commented 2 weeks ago

@pankajastro Hi! Any ideas when it will be released?