apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.44k stars 14.11k forks source link

Mark Success but remote task continue running #40339

Closed dotrungkien3210 closed 3 months ago

dotrungkien3210 commented 3 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.9.0

What happened?

I had manage a airflow HA celery cluster and manage nearly 150 DAGs, almost working correctly. But only one DAG met trouble. When I mark success, the remote executer trigger by ssh operator continue running. The problem is just one DAG happend, another DAG work correctly although they have same logic

What you think should happen instead?

I think some trouble from code stuck and waiting

How to reproduce

hope you can make muntiple SSHook and process in diffirent server in one DAG

Operating System

Ubuntu 20.04

Versions of Apache Airflow Providers

2.9.0

Deployment

Other

Deployment details

airflow HA celery cluster in 2 node, with 2 scheduler, 2 consumer, 2 worker

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 3 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

nathadfield commented 3 months ago

@dotrungkien3210 I'm afraid you are going to have to provide more information and context for anyone to be able to assist you in this. If this is a bug then you'll need to demonstrate it in a way that can be reproduced. If this is more of a support/implementation problem then you will be much better served by speaking to the community in Slack.

dotrungkien3210 commented 3 months ago

Hi @nathadfield this is full DAG code command_build = 'bash bin/airflow_build.sh ' checkout_branch = 'master' build_command = 'bash /home/airflow/platform/check_git/checkgit.sh ' + path_project + ' ' + project_name + ' "' + command_build + '" ' + checkout_branch build_ope = SSHOperator( ssh_hook=sshHook, task_id='crawler-system_build', command=build_command, on_failure_callback=create.task_on_failure_callback, get_pty=True, do_xcom_push=False, dag=dag) command = 'cd ' + path_project + project_name + ' && bash bin/run_migrate_story_book.sh ' run_ope = SSHOperator( ssh_hook=sshHook, task_id=dag_name, do_xcom_push=False, on_failure_callback=create.task_on_failure_callback, command=command, dag=dag)

config = create_dag.getConfig(path="/airflow/crawler-system")

for key, value in config.items(): getConf = SSHOperator( ssh_hook=sshHook, task_id='getconfig' + key, command='echo "' + value + '" > ' + path_project + project_name + '/' + 'properties/' + key, on_failure_callback=create.task_on_failure_callback, get_pty=True, do_xcom_push=False, dag=dag ) getConf >> build_ope pass

server_bs = create_dag.select_server('', '') sshHook_bs = SSHHook(server_bs) command_build_bs = 'bash bin/schedule/build_airflow.sh ' build_command_bs = 'bash /home/airflow/platform/check_git/checkgit.sh ' + path_project_bs + ' ' + project_name_bs + ' "' + command_build_bs + '" ' + checkout_branch build_ope_bs = SSHOperator( ssh_hook=sshHook_bs, task_id='brandSafety_build', command=build_command_bs, on_failure_callback=create.task_on_failure_callback, get_pty=True, do_xcom_push=False, dag=dag)

command_bs = 'cd ' + path_project_bs + project_name_bs + ' && bash bin/schedule/run_check_coverage_storybook.sh ' run_ope_bs = SSHOperator( ssh_hook=sshHook_bs, task_id='check_coverage_storybook', do_xcom_push=False, on_failure_callback=create.task_on_failure_callback, command=command_bs, dag=dag)

config_bs = create_dag.getConfig(path="/airflow/bs-core") for key, value in config_bs.items(): getConf = SSHOperator( ssh_hook=sshHook_bs, task_id='get_configbs' + key, command='echo "' + value + '" > ' + path_project_bs + project_name_bs + '/' + 'properties/' + key, on_failure_callback=create.task_on_failure_callback, get_pty=True, do_xcom_push=False, dag=dag ) getConf >> build_ope_bs pass

command_build_repush = 'bash bin/airflow_build.sh ' build_command_repush = 'bash /home/airflow/platform/check_git/checkgit.sh ' + path_project + ' ' + project_name + ' "' + command_build_repush + '" ' + checkout_branch build_ope_repush = SSHOperator( ssh_hook=sshHook_bs, task_id='crawler-system_build_repush', command=build_command_repush, on_failure_callback=create.task_on_failure_callback, get_pty=True, do_xcom_push=False, dag=dag)

command_repush = 'cd ' + path_project + project_name + ' && bash bin/run_repush_storybook.sh ' run_ope_repush = SSHOperator( ssh_hook=sshHook_bs, task_id='repush_story_book', do_xcom_push=False, on_failure_callback=create.task_on_failure_callback, command=command_repush, dag=dag)

config_repush = create_dag.getConfig(path="/airflow/bs-core") for key, value in config_bs.items(): getConf = SSHOperator( ssh_hook=sshHook_bs, task_id='get_configrepush' + key, command='echo "' + value + '" > ' + path_project + project_name + '/' + 'properties/' + key, on_failure_callback=create.task_on_failure_callback, get_pty=True, do_xcom_push=False, dag=dag ) getConf >> build_ope_repush pass

[build_ope, build_ope_bs, build_ope_repush] >> run_ope >> run_ope_bs >> run_ope_repush

nathadfield commented 3 months ago

Thanks but this does not provide us with the ability to reproduce the problem, if indeed there even is a bug behind the problem you are facing and it not be something to do with your code or your implementation. You will have to try and pinpoint the problem a bit more on your side before anyone can really assess whether there truly is a bug or not.