Closed lanxing2 closed 2 months ago
DolphinScheduler Version 3.2.1 When stop a Process with Flink Task in CLUSTER Mode, dolphinscheduler will kill the flink job yarn application first. YarnApplicationManager.execYarnKillCommand will be invoke, and the Yarn Kill Command Will failed with error cannot find command yarn
[ERROR] 2024-07-03 10:16:53.690 +0800 - Kill yarn application [[application_1714114694986_0041]] failed
org.apache.dolphinscheduler.common.shell.AbstractShell$ExitCodeException: /tmp/dolphinscheduler/exec/process/default/13289909089664/13776450314784_11/190/200/application_1714114694986_0041.kill:行6: yarn:未找到命令
at org.apache.dolphinscheduler.common.shell.AbstractShell.runCommand(AbstractShell.java:205)
at org.apache.dolphinscheduler.common.shell.AbstractShell.run(AbstractShell.java:118)
at org.apache.dolphinscheduler.common.shell.ShellExecutor.execute(ShellExecutor.java:125)
at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:103)
at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:86)
at org.apache.dolphinscheduler.common.utils.OSUtils.exeShell(OSUtils.java:345)
at org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(OSUtils.java:334)
at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.execYarnKillCommand(YarnApplicationManager.java:89)
at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.killApplication(YarnApplicationManager.java:48)
at org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(ProcessUtils.java:192)
at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.doKill(TaskInstanceKillOperationFunction.java:100)
at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.operate(TaskInstanceKillOperationFunction.java:69)
at org.apache.dolphinscheduler.server.worker.rpc.TaskInstanceOperatorImpl.killTask(TaskInstanceOperatorImpl.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.dolphinscheduler.extract.base.server.ServerMethodInvokerImpl.invoke(ServerMethodInvokerImpl.java:41)
at org.apache.dolphinscheduler.extract.base.server.JdkDynamicServerHandler.lambda$processReceived$0(JdkDynamicServerHandler.java:108)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[ERROR] 2024-07-03 10:16:53.691 +0800 - Cancel application failed: /tmp/dolphinscheduler/exec/process/default/13289909089664/13776450314784_11/190/200/application_1714114694986_0041.kill:行6: yarn:未找到命令
The root cause is that the shell file is executed by sh not bash
https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java#L69-L90
sh do not load /etc/profile automatically for the PATH, so sh cannot find yarn command
Need add "source /etc/profile" to load the PATH and execute yarn command
Change code like following
private void execYarnKillCommand(String tenantCode, String commandFile,
String cmd) throws Exception {
StringBuilder sb = new StringBuilder();
sb.append("#!/bin/sh\n");
sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
sb.append("cd $BASEDIR\n");
sb.append("source /etc/profile\n");
sb.append("\n\n");
sb.append(cmd);
File f = new File(commandFile);
if (!f.exists()) {
org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(),
StandardCharsets.UTF_8);
}
String runCmd = String.format("%s %s", Constants.SH, commandFile);
runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
log.info("kill cmd:{}", runCmd);
org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
}
After make this change, the YarnApplicationManager.execYarnKillCommand can kill the yarn process sucessfully when stop the Flink Task
However, there are still error in logs
[ERROR] 2024-07-03 15:10:04.875 +0800 - Kill yarn application [[application_1714114694986_0057]] failed
org.apache.dolphinscheduler.common.shell.AbstractShell$ExitCodeException: 2024-07-03 15:10:04,274 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at node1/172.0.107.57:8032
2024-07-03 15:10:04,863 INFO impl.YarnClientImpl: Killed application application_1714114694986_0057
at org.apache.dolphinscheduler.common.shell.AbstractShell.runCommand(AbstractShell.java:205)
at org.apache.dolphinscheduler.common.shell.AbstractShell.run(AbstractShell.java:118)
at org.apache.dolphinscheduler.common.shell.ShellExecutor.execute(ShellExecutor.java:125)
at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:103)
at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:86)
at org.apache.dolphinscheduler.common.utils.OSUtils.exeShell(OSUtils.java:345)
at org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(OSUtils.java:334)
at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.execYarnKillCommand(YarnApplicationManager.java:89)
at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.killApplication(YarnApplicationManager.java:48)
at org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(ProcessUtils.java:192)
at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.doKill(TaskInstanceKillOperationFunction.java:100)
at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.operate(TaskInstanceKillOperationFunction.java:69)
at org.apache.dolphinscheduler.server.worker.rpc.TaskInstanceOperatorImpl.killTask(TaskInstanceOperatorImpl.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.dolphinscheduler.extract.base.server.ServerMethodInvokerImpl.invoke(ServerMethodInvokerImpl.java:41)
at org.apache.dolphinscheduler.extract.base.server.JdkDynamicServerHandler.lambda$processReceived$0(JdkDynamicServerHandler.java:108)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[ERROR] 2024-07-03 15:10:04.876 +0800 - Cancel application failed: 2024-07-03 15:10:04,274 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at node1/172.0.107.57:8032
I tried start another Flink Task, create the kill command locally and run the command. The command success with output Stream
2024-07-03 15:37:18,381 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at node1/172.0.107.57:8032
Killing application application_1714114694986_0059
2024-07-03 15:37:18,883 INFO impl.YarnClientImpl: Killed application application_1714114694986_0059
I am not sure why the AbstractShell do not treat this like a successful execution and put the INFO into error stream
1.YarnApplicationManager.execYarnKillCommand kill Yarn Applicaiton Successful without any error
2.AbstractYarnTask keep tracking the Yarn Applicaiton status, if the Yarn Application is still running, the task is in executing state.
Currently, Flink Task has not implement tracking Yarn Application Status.
If you run the tasks in CLUSTER Mode, after submit the job to Yarn, the task will success and finished.
If you want to acutally stop the Flink Job, you need go to the Yarn Application UI to stop the flink Job
However I want to track the yarn application status and end the task from dolphinscheduler, because we do not want to expose our Yarn Application WebUI to our users.
I add following code in FlinkTask to monitor the Yarn Application Status
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
super.handle(taskCallBack);
if (FlinkDeployMode.CLUSTER.equals(flinkParameters.getDeployMode()) ||
FlinkDeployMode.APPLICATION.equals(flinkParameters.getDeployMode())) {
trackApplicationStatus();
}
}
@Override
public void trackApplicationStatus() throws TaskException {
log.info("Flink Task Yarn Application Id is " + appIds);
YarnClient yarnClient = YarnClient.createYarnClient();
try {
initialYarnClient(yarnClient);
String[] splitAppIds = appIds.split("_");
ApplicationId applicationId = ApplicationId.newInstance(Long.parseLong(splitAppIds[1]),
Integer.parseInt(splitAppIds[2]));
boolean yarnRunningFlag = true;
while (yarnRunningFlag) {
ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
YarnApplicationState appState = appReport.getYarnApplicationState();
log.info("Yarn Application State is " + appState);
if (YarnApplicationState.FAILED.equals(appState)) {
yarnRunningFlag = false;
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
} else if (YarnApplicationState.FINISHED.equals(appState) ||
YarnApplicationState.KILLED.equals(appState)) {
yarnRunningFlag = false;
}
Thread.sleep(FlinkConstants.FLINK_YARN_TRACKING_SLEEP_MILLIS);
}
} catch (YarnException | IOException | NullPointerException e) {
log.error("Failed to track application status", e);
throw new RuntimeException("Failed to track application status");
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
log.info("The current yarn task has been interrupted", ex);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
throw new TaskException("The current yarn task has been interrupted", ex);
} finally {
try {
// Stop YarnClient
yarnClient.stop();
// Close YarnClient
yarnClient.close();
} catch (IOException e) {
log.error("Close Yarn Client Failed!", e);
}
}
}
private void initialYarnClient(YarnClient yarnClient) throws MalformedURLException {
YarnConfiguration conf = new YarnConfiguration();
conf.addResource(new File(System.getenv("HADOOP_CONF_DIR").concat("/hdfs-site.xml")).toURI().toURL());
conf.addResource(new File(System.getenv("HADOOP_CONF_DIR").concat("/core-site.xml")).toURI().toURL());
conf.addResource(new File(System.getenv("HADOOP_CONF_DIR").concat("/yarn-site.xml")).toURI().toURL());
yarnClient.init(conf);
yarnClient.start();
}
After add this code, the Process with Flink Task will keep in EXECUTE state, and when you can stop the process, dolphinsheduler will try to kill the flink yarn application by command during stop the task.
No response
3.2.x
execYarnKillCommand is in ProcessUtils in previous branch, the execYarnKillCommand in ProcessUtils is correct
You need to check yarn
command is properly installed on the worker server.
You need to check
yarn
command is properly installed on the worker server.
yarn
command is installed appropriately on the worker server
By the way, do you have any idea about how dolphinscheduler tracking Yarn Application Status currently?
If you are sure that the yarn
command is installed and works properly. In this case, you need to check whether the user who performs this task has the permission to execute sudo and whether the sudo command opens the permission to execute yarn
.
Can you find out if the kill cmd
is correct?
You need to check
yarn
command is properly installed on the worker server.
yarn
command is installed appropriately on the worker serverBy the way, do you have any idea about how dolphinscheduler tracking Yarn Application Status currently?
Right now, ds will not track the yarn application status, in most of case we don't need to track, since task is using sync mode, only if after cluster failover, we need to track the yarn task status, we can call yarn REST API to do this, but this is not implement in ds.
Is there any plan to support sync/async mode for YarnTask? I achieve the code to track yarn application status by yarn REST API in my own code base.
Can you find out if the
kill cmd
is correct?
kill cmd
can kill process in my host.
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.
This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.
Search before asking
What happened
DolphinScheduler Version 3.2.1 When stop a Process with Flink Task in CLUSTER Mode, dolphinscheduler will kill the flink job yarn application first. YarnApplicationManager.execYarnKillCommand will be invoke, and the Yarn Kill Command Will failed with error cannot find command yarn
The root cause is that the shell file is executed by sh not bash
https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java#L69-L90
sh do not load /etc/profile automatically for the PATH, so sh cannot find yarn command
Need add "source /etc/profile" to load the PATH and execute yarn command
Change code like following
After make this change, the YarnApplicationManager.execYarnKillCommand can kill the yarn process sucessfully when stop the Flink Task
However, there are still error in logs
I tried start another Flink Task, create the kill command locally and run the command. The command success with output Stream
I am not sure why the AbstractShell do not treat this like a successful execution and put the INFO into error stream
What you expected to happen
1.YarnApplicationManager.execYarnKillCommand kill Yarn Applicaiton Successful without any error
2.AbstractYarnTask keep tracking the Yarn Applicaiton status, if the Yarn Application is still running, the task is in executing state.
How to reproduce
Currently, Flink Task has not implement tracking Yarn Application Status.
If you run the tasks in CLUSTER Mode, after submit the job to Yarn, the task will success and finished.
If you want to acutally stop the Flink Job, you need go to the Yarn Application UI to stop the flink Job However I want to track the yarn application status and end the task from dolphinscheduler, because we do not want to expose our Yarn Application WebUI to our users.
I add following code in FlinkTask to monitor the Yarn Application Status
After add this code, the Process with Flink Task will keep in EXECUTE state, and when you can stop the process, dolphinsheduler will try to kill the flink yarn application by command during stop the task.
Anything else
No response
Version
3.2.x
Are you willing to submit PR?
Code of Conduct