apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.24k stars 13.77k forks source link

dag callbacks not called on SSHOperator (base operator?) #40119

Open trlopes1974 opened 3 weeks ago

trlopes1974 commented 3 weeks ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.2

What happened?

Failure callback on SSHOperator is not being called if the on_failure_callback is not defined at task level!

With the dag default args: Dag Default args: default_args = { 'owner' : 'ttauto', 'on_success_callback':success_callback, 'on_failure_callback':failure_callback, 'depends_on_past': False, 'start_date': datetime(2021, 1, 1), 'email': Variable.get("DAG_FAILURE_EMAIL",deserialize_json=True), 'email_on_failure': True, 'email_on_retry': False, 'retries': 0, 'retry_delay' : timedelta(seconds=20), 'max_retry_delay' : timedelta(seconds=60), 'retry_exponential_backoff': True }

If you have a task baes on the airflow.providers.ssh.operators.ssh SSHOperator, if there is a failures the 'on_failure_callback' defined in the DAG default arguments is not called.

Example task: ssh_command_operator = SSHOperator( task_id='ssh_command_operator', command='sudo -s -- eval \'su - XXX -c ". /home/XXX/POEM.env && cd KillSession && sqlplus SQLSCRIPT"\'' , ssh_hook=ssh_hook, cmd_timeout = 40, conn_timeout= 20,
do_xcom_push = True
)

If the above task fails, the failure callback is not executed.

If you define the task with the callbacks, then the callback is executed: ex: ssh_command_output_operator = SSHOperator( task_id='ssh_command_output_operator', command='sudo -s -- eval \'su - XXX -c "SOMECOMMAND"\'' , ssh_hook=ssh_hook, cmd_timeout = 90, conn_timeout= 120, on_failure_callback = failure_callback,
do_xcom_push = True
)

What you think should happen instead?

The DAG callback definitions should affect all the dag defined tasks.

How to reproduce

In the description

Operating System

NAME="Red Hat Enterprise Linux" VERSION="8.8 (Ootpa)" ID="rhel" ID_LIKE="fedora" VERSION_ID="8.8" PLATFORM_ID="platform:el8" PRETTY_NAME="Red Hat Enterprise Linux 8.8 (Ootpa)" ANSI_COLOR="0;31" CPE_NAME="cpe:/o:redhat:enterprise_linux:8::baseos" HOME_URL="https://www.redhat.com/" DOCUMENTATION_URL="https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/8" BUG_REPORT_URL="https://bugzilla.redhat.com/" REDHAT_BUGZILLA_PRODUCT="Red Hat Enterprise Linux 8" REDHAT_BUGZILLA_PRODUCT_VERSION=8.8 REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux" REDHAT_SUPPORT_PRODUCT_VERSION="8.8"

Versions of Apache Airflow Providers

(env_airflow) [ttauto@slautop03 ~]$ pip freeze | grep apache-airflow-providers apache-airflow-providers-celery==3.6.0 apache-airflow-providers-common-io==1.3.0 apache-airflow-providers-common-sql==1.11.0 apache-airflow-providers-ftp==3.7.0 apache-airflow-providers-hashicorp==3.4.2 apache-airflow-providers-http==4.9.1 apache-airflow-providers-imap==3.5.0 apache-airflow-providers-postgres==5.6.0 apache-airflow-providers-sftp==4.6.1 apache-airflow-providers-smtp==1.6.1 apache-airflow-providers-sqlite==3.7.1 apache-airflow-providers-ssh==3.7.2

Deployment

Other

Deployment details

local instalation.

Anything else?

We have noticed that both callbacks were not executed. ( success / failure )

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 3 weeks ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

jang-hs commented 3 weeks ago

I have defined on_failure_callback in the default_args (Airflow 2.8.2 environment). I successfully invoked the callback when the SSH operator task failed.

My installed versions of apache-airflow-providers are as follows: apache-airflow-providers-common-io==1.3.2 apache-airflow-providers-common-sql==1.14.0 apache-airflow-providers-ftp==3.9.1 apache-airflow-providers-http==4.11.1 apache-airflow-providers-imap==3.6.1 apache-airflow-providers-sqlite==3.8.1 apache-airflow-providers-ssh==3.11.1

vatsrahul1001 commented 3 weeks ago

@trlopes1974 I also tried apache-airflow-providers-ssh==3.11.1 and worked fine for me. Could you please try with apache-airflow-providers-ssh==3.11.1

trlopes1974 commented 2 weeks ago

the same:

[2024-06-11, 09:47:09 WEST] {taskinstance.py:2728} ERROR - Task failed with exception Traceback (most recent call last): File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task result = _execute_callable(context=context, execute_callable_kwargs) File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable return execute_callable(context=context, execute_callable_kwargs) File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/ssh/operators/ssh.py", line 191, in execute result = self.run_ssh_client_command(ssh_client, self.command, context=context) File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/ssh/operators/ssh.py", line 176, in run_ssh_client_command exit_status, agg_stdout, agg_stderr = self.hook.exec_ssh_client_command( File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/ssh/hooks/ssh.py", line 554, in exec_ssh_client_command raise AirflowException("SSH command timed out") airflow.exceptions.AirflowException: SSH command timed out [2024-06-11, 09:47:09 WEST] {taskinstance.py:527} DEBUG - Task Duration set to 91.299578 [2024-06-11, 09:47:09 WEST] {taskinstance.py:549} DEBUG - Clearing next_method and next_kwargs. [2024-06-11, 09:47:09 WEST] {taskinstance.py:1149} INFO - Marking task as FAILED. dag_id=Kill_Session_Oracle, task_id=ssh_command_operator, execution_date=20240611T084503, start_date=20240611T084538, end_date=20240611T084709 [2024-06-11, 09:47:09 WEST] {transport.py:1893} DEBUG - EOF in transport thread [2024-06-11, 09:47:09 WEST] {cli_action_loggers.py:85} DEBUG - Calling callbacks: [] [2024-06-11, 09:47:09 WEST] {standard_task_runner.py:107} ERROR - Failed to execute job 82450 for task ssh_command_operator (SSH command timed out; 3252796) [2024-06-11, 09:47:09 WEST] {local_task_job_runner.py:234} INFO - Task exited with return code 1

you can see the callbacks are empty.

pip freeze | grep -i airflow apache-airflow==2.8.2 .... apache-airflow-providers-ssh==3.11.1

trlopes1974 commented 2 weeks ago

another important info. With on_success_callback = success_callback, on_failure_callback = failure_callback,
declared at the operator (task) level, you can see the callbacks defined in the Taks Details panel without it the callbacks are not defined.

with: image

without: image

trlopes1974 commented 2 weeks ago

Did you guys called te operator directly or are using a python operator to execute it?

vatsrahul1001 commented 2 weeks ago

@trlopes1974 try adding on_failure_callback in default args.

trlopes1974 commented 2 weeks ago

@vatsrahul1001 , that is exactly the problem. on_failure_callback is defined in the DAG default args:

default_args = { 'owner' : 'ttauto', 'on_success_callback':success_callback, 'on_failure_callback':failure_callback, 'depends_on_past': False, 'start_date': datetime(2021, 1, 1), 'email': Variable.get("DAG_FAILURE_EMAIL",deserialize_json=True), 'email_on_failure': True, 'email_on_retry': False, 'retries': 0, 'retry_delay' : timedelta(seconds=20), 'max_retry_delay' : timedelta(seconds=60), 'retry_exponential_backoff': True
}

Kill_Session_Oracle = DAG(dag_id='Kill_Session_Oracle', default_args=default_args, catchup=False, schedule_interval=None, schedule=None, start_date=datetime(2024, 2, 5), doc_md=doc_md_DAG, description='Efetua o KillSession em BD Oracle', max_active_tasks=1, max_active_runs=1, params={ "WORKORDERID": Param(default="",description="O Número da workorder (Workorder ID)" , type=["null", "string"]), "ISDEBUG": Param(default=False,title="Modo Debug",description="Testa o dag com os valores indicados em DEBUG_DATA" , type="boolean"), "DEBUG_DATA": Param(default=None,description="WO_DATA" , type=["null", "object"]) }, render_template_as_native_obj=True, tags=['CS00000026','SSH']

)

We have also upgraded airflow to the latest version.: (env_airflow) [ttauto@slautop03 airflow]$ pip freeze | grep -i airflow apache-airflow==2.9.2 apache-airflow-providers-celery==3.7.2 apache-airflow-providers-common-io==1.3.2 apache-airflow-providers-common-sql==1.14.0 apache-airflow-providers-fab==1.1.1 apache-airflow-providers-ftp==3.9.1 apache-airflow-providers-hashicorp==3.7.1 apache-airflow-providers-http==4.11.1 apache-airflow-providers-imap==3.6.1 apache-airflow-providers-postgres==5.11.1 apache-airflow-providers-sftp==4.10.1 apache-airflow-providers-smtp==1.7.1 apache-airflow-providers-sqlite==3.8.1 apache-airflow-providers-ssh==3.11.1

and the behavior is the same, no callback is called...

[2024-06-12, 11:30:40 WEST] {transport.py:1909} DEBUG - Sending global request "keepalive@lag.net" [2024-06-12, 11:30:45 WEST] {taskinstance.py:441} ▼ Post task execution logs [2024-06-12, 11:30:45 WEST] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, execute_callable_kwargs) File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, execute_callable_kwargs) File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/models/baseoperator.py", line 401, in wrapper return func(self, *args, **kwargs) File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/ssh/operators/ssh.py", line 191, in execute result = self.run_ssh_client_command(ssh_client, self.command, context=context) File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/ssh/operators/ssh.py", line 176, in run_ssh_client_command exit_status, agg_stdout, agg_stderr = self.hook.exec_ssh_client_command( File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/ssh/hooks/ssh.py", line 554, in exec_ssh_client_command raise AirflowException("SSH command timed out") airflow.exceptions.AirflowException: SSH command timed out [2024-06-12, 11:30:45 WEST] {taskinstance.py:562} DEBUG - Task Duration set to 91.352166 [2024-06-12, 11:30:45 WEST] {taskinstance.py:584} DEBUG - Clearing next_method and next_kwargs. [2024-06-12, 11:30:45 WEST] {taskinstance.py:1206} INFO - Marking task as FAILED. dag_id=Kill_Session_Oracle, task_id=ssh_command_operator, run_id=manual__2024-06-12T11:28:43+01:00, execution_date=20240612T102843, start_date=20240612T102913, end_date=20240612T103045 [2024-06-12, 11:30:45 WEST] {cli_action_loggers.py:88} DEBUG - Calling callbacks: [] [2024-06-12, 11:30:45 WEST] {standard_task_runner.py:110} ERROR - Failed to execute job 91871 for task ssh_command_operator (SSH command timed out; 151149) [2024-06-12, 11:30:45 WEST] {local_task_job_runner.py:240} INFO - Task exited with return code 1 [2024-06-12, 11:30:45 WEST] {local_task_job_runner.py:222} ▲▲▲ Log group end

vatsrahul1001 commented 2 weeks ago

@trlopes1974 Below is the sample DAG I am using to test it with ssh and python operator both and it works fine

def fail():
    raise Exception

def mycallback(context):
    print("I HAVE BEEN CALLED! " * 4)

default_args = {
    "owner": "airflow",
    "start_date": datetime(2018, 10, 31),
    'on_failure_callback': mycallback
}

with DAG(
    dag_id="on_failure_callback",
    default_args=default_args,
    schedule_interval="@once",
    catchup=False,
    doc_md=docs,
    tags=["core"],
) as dag:

    test1 = PythonOperator(
        task_id="print_to_logs",
        python_callable=fail,
    )

    # Define the SSHOperator task
    ssh_task = SSHOperator(
        task_id='ssh_task',
        ssh_conn_id='ssh_default',  # SSH connection ID registered in Airflow
        command='echo1 "Hello, World!"',  # Command to execute
        dag=dag,
    )

    test2 = PythonOperator(
        task_id="check_logs",
        trigger_rule="one_failed",
        python_callable=log_checker,
        op_args=["print_to_logs", "I HAVE BEEN CALLED!", "I HAVEN'T BEEN CALLED!"],
    )

    test3 = PythonOperator(
        task_id="check_logs1",
        trigger_rule="one_failed",
        python_callable=log_checker,
        op_args=["ssh_task", "I HAVE BEEN CALLED!", "I HAVEN'T BEEN CALLED!"],
    )

test1 >> test2

I see your ssh task has cmd_timeout conn_timeout can you trying removing them and test again

trlopes1974 commented 2 weeks ago

@vatsrahul1001 Thanks for you input. The problem with your example was that it was not calling the ssh_operator task. I have no issues with python operators failing. Yet , it gave-me the opportunity to further test. I think that, for some reason, the way the dag is declared affects the way the callbacks are also "attached" to the tasks/operators. Using YOUR code and adding the ssh_operator task to the workflow ( with a cmdline to return some error ) worked like a charm and the callback was invoked:

    raise AirflowException(f"SSH operator error: exit status = {exit_status}")
airflow.exceptions.AirflowException: SSH operator error: exit status = 127
[2024-06-12, 14:22:16 WEST] {taskinstance.py:562} DEBUG - Task Duration set to 0.653776
[2024-06-12, 14:22:16 WEST] {taskinstance.py:584} DEBUG - Clearing next_method and next_kwargs.
[2024-06-12, 14:22:16 WEST] {taskinstance.py:1206} INFO - Marking task as FAILED. dag_id=on_failure_callback, task_id=ssh_operator, run_id=scheduled__2018-10-31T00:00:00+00:00, execution_date=20181031T000000, start_date=20240612T132215, end_date=20240612T132216
[2024-06-12, 14:22:16 WEST] {logging_mixin.py:188} INFO - I HAVE BEEN CALLED! I HAVE BEEN CALLED! I HAVE BEEN CALLED! I HAVE BEEN CALLED! 

So. I have adapted the code to reflect the behavior on "my" dag and I found that the DAG declaration ( this is the "only" difference I can see" ) is probably affecting the callbacks.

So, instead of using "with DAG..." format, I'm using "DAGNAMEVAR = DAG( ...)" syntax. and this seems to be the only reason for the callback to fail ( not being called)

Non working callback example dag:


from airflow import DAG
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from datetime import datetime

def pyoperator():
    print("\nYOU SHALL NOT PASS!!!!" * 4)

def mycallback(context):
    print("\nI HAVE FAILED" * 4 )

default_args = {
    "owner": "airflow",
    "start_date": datetime(2018, 10, 31),
    'on_failure_callback': mycallback   
    }

ONFAILURECALLBACK = DAG( dag_id='on_failure_callback',
    default_args=default_args,
    catchup=False,
    schedule_interval=None, 
    schedule=None,
    start_date=datetime(2024, 2, 5),
    description='failure callback test',
    max_active_tasks=1,
    max_active_runs=1,
    render_template_as_native_obj=True,
    tags=['core','SSH'] 
    )

test1 = PythonOperator(
    task_id="python_task",
    python_callable=pyoperator,
    )

ssh_hook = SSHHook(
    ssh_conn_id='ssh_default',
    cmd_timeout = 10,
    keepalive_interval=5
    )

ssh_operator = SSHOperator(
    task_id='ssh_operator',
    command='donothing' ,
    ssh_hook=ssh_hook,
    do_xcom_push = True
    )

start_task = EmptyOperator(task_id='Start',dag=ONFAILURECALLBACK)

start_task >> test1 >> ssh_operator

Logs from this dag (no printing from the callback):

File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/paramiko/client.py", line 819, in _auth
    raise SSHException("No authentication methods available")
paramiko.ssh_exception.SSHException: No authentication methods available
[2024-06-12, 15:07:15 WEST] {taskinstance.py:562} DEBUG - Task Duration set to 8.511477
[2024-06-12, 15:07:15 WEST] {taskinstance.py:584} DEBUG - Clearing next_method and next_kwargs.
[2024-06-12, 15:07:15 WEST] {taskinstance.py:1206} INFO - Marking task as FAILED. dag_id=on_failure_callback, task_id=ssh_operator, run_id=manual__2024-06-12T14:06:51.203163+00:00, execution_date=20240612T140651, start_date=20240612T140706, end_date=20240612T140715
[2024-06-12, 15:07:15 WEST] {cli_action_loggers.py:88} DEBUG - Calling callbacks: []
[2024-06-12, 15:07:15 WEST] {standard_task_runner.py:110} ERROR - Failed to execute job 93276 for task ssh_operator (No authentication methods available; 299584)
[2024-06-12, 15:07:15 WEST] {local_task_job_runner.py:240} INFO - Task exited with return code 1
[2024-06-12, 15:07:15 WEST] {dagrun.py:931} DEBUG - number of tis tasks for <DagRun on_failure_callback @ 2024-06-12 14:06:51.203163+00:00: manual__2024-06-12T14:06:51.203163+00:00, state:running, queued_at: 2024-06-12 14:06:51.237212+00:00. externally triggered: True>: 0 task(s)
[2024-06-12, 15:07:15 WEST] {taskinstance.py:3503} INFO - 0 downstream tasks scheduled from follow-on schedule check
trlopes1974 commented 2 weeks ago

attaching both dag versions example_dags.zip

the OK logs:

paramiko.ssh_exception.SSHException: No authentication methods available [2024-06-12, 15:23:50 WEST] {taskinstance.py:562} DEBUG - Task Duration set to 8.00968 [2024-06-12, 15:23:50 WEST] {taskinstance.py:584} DEBUG - Clearing next_method and next_kwargs. [2024-06-12, 15:23:50 WEST] {taskinstance.py:1206} INFO - Marking task as FAILED. dag_id=on_failure_callback_working, task_id=ssh_operator, run_id=manual__2024-06-12T14:23:33.741435+00:00, execution_date=20240612T142333, start_date=20240612T142342, end_date=20240612T142350 [2024-06-12, 15:23:50 WEST] {logging_mixin.py:188} INFO - I HAVE FAILED [2024-06-12, 15:23:50 WEST] {cli_action_loggers.py:88} DEBUG - Calling callbacks: []

trlopes1974 commented 1 week ago

@vatsrahul1001 where you able to try this?

vatsrahul1001 commented 1 week ago

@trlopes1974 I am able to repo this