apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.13k stars 14.31k forks source link

@hookimpl on_dag_run_running, on_dag_run_success, on_dag_run_failed do not find Connections and Variables #39646

Open ChrnvaN opened 6 months ago

ChrnvaN commented 6 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.1

What happened?

I wrote a listener plugin and added it to the plugins directory. The listener includes the methods:

airflow.exceptions.AirflowNotFoundException: The conn_id my_connection isn't defined


Although the connection is explicitly defined in Connections and everything works in the case of listening to tasks.

How can I fix it so that if I listen to Dog Run, everything works too?

### What you think should happen instead?

I expect the values to be pulled up from Connection and Variable when listening to DagRun.

### How to reproduce

Add to Connection "my_connection" and add to Variable "environment" 

from airflow.listeners import hookimpl from airflow.models.taskinstance import TaskInstance from airflow.hooks.base import BaseHook from airflow.models import Variable from airflow.models.dagrun import DagRun from airflow.plugins_manager import AirflowPlugin

class AirflowListener: @hookimpl def on_task_instance_running(self, task_instance: TaskInstance) -> None: my_connection = BaseHook.get_connection("my_connection") env = Variable.get("environment")

@hookimpl
def on_task_instance_success(self, task_instance: TaskInstance) -> None:
    my_connection = BaseHook.get_connection("my_connection")
    env = Variable.get("environment")

@hookimpl
def on_task_instance_failed(self, task_instance: TaskInstance) -> None:
    my_connection = BaseHook.get_connection("my_connection")
    env = Variable.get("environment")

@hookimpl
def on_dag_run_running(self, dag_run: DagRun):
    my_connection = BaseHook.get_connection("my_connection")
    env = Variable.get("environment")

@hookimpl
def on_dag_run_success(self, dag_run: DagRun):
    my_connection = BaseHook.get_connection("my_connection")
    env = Variable.get("environment")

@hookimpl
def on_dag_run_failed(self, dag_run: DagRun):
    my_connection = BaseHook.get_connection("my_connection")
    env = Variable.get("environment")

class AirflowListenerPlugin(AirflowPlugin): name = "AirflowListener" listeners = [AirflowListener()]



### Operating System

macOS Sonoma 14.1.2

### Versions of Apache Airflow Providers

_No response_

### Deployment

Docker-Compose

### Deployment details

 apache/airflow:2.8.1-python3.9
 executor: CeleryExecutor

### Anything else?

_No response_

### Are you willing to submit PR?

- [ ] Yes I am willing to submit a PR!

### Code of Conduct

- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
boring-cyborg[bot] commented 6 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.