Bpazy / blog

我的博客,欢迎关注和讨论
https://github.com/Bpazy/blog/issues
MIT License
39 stars 2 forks source link

XXL-JOB点击终止任务无效 #290

Open Bpazy opened 1 year ago

Bpazy commented 1 year ago

问题描述

发布日,发现xxljob上点击终止任务无效。

问题分析

首先查看xxljob上被终止的任务类型发现是GLUE(SHELL),且shell代码为:

#!/bin/bash
sh /data/f6-purchase-analysis/prod/shell/purchase-analysis-job-web.sh
#common configure
root_dir=/opt/f6
#java home
JAVA_HOME_7="${root_dir}/tools/jdk1.7.0_80"
JAVA_HOME_8="${root_dir}/tools/jdk1.8.0_111"
JAVA_HOME=$JAVA_HOME_8
export JAVA_HOME

#根据环境更改对应的配置
#curtime=`date +%Y-%m-%d`
logpath=/home/f6-purchase-analysis/local-test-web/logs/kettle-`date +%Y-%m-%d`.log
kettle_home=/home/opt/data-integration
job_home=/home/f6-purchase-analysis/local-test-web/

#echo $1 >> $logpath
#if [ ! -n $1 ]; then  
#       $1= $curtime 
#fi
#echo $1 >> $logpath
#echo $curtime >> $logpath

$kettle_home/kitchen.sh -file=$job_home/purchase_analysis_web.kjb -level=Minimal -log=$logpath -param=group_id_set=$1 -param=analysisTime=$2

那么分析下xxljob调度器和执行器的这一类型任务的源码即可。

终止任务原理

  1. 调度器获取执行器的抽象对象;
  2. 执行器调用kill方法:

    1. 设置终止原因;
    2. 调用Thread的interrupt()方法终止子线程;
      
      // ExecutorBizImpl.java
      @Override
      public ReturnT<String> kill(int jobId) {
      // kill handlerThread, and create new one
      JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
      if (jobThread != null) {
      XxlJobExecutor.removeJobThread(jobId, "scheduling center kill job.");
      return ReturnT.SUCCESS;
      }

    return new ReturnT(ReturnT.SUCCESS_CODE, "job thread aleady killed."); }

// XxljobExecutor.java
public static void removeJobThread(int jobId, String removeOldReason){
    JobThread oldJobThread = jobThreadRepository.remove(jobId);
    if (oldJobThread != null) {
        oldJobThread.toStop(removeOldReason);
        oldJobThread.interrupt();
    }
}

下面分析GLUE(SHELL)类型的任务是如何启动的。 执行任务步骤:

  1. 根据ID获取任务的各项参数;
  2. 根据路由规则获取对应的执行器;
  3. 执行器根据任务类型选择对应的Handler,GLUE(SHELL)对应的Handler为ScriptJobHandler;
  4. run。

下面来分析下ScriptJobHandler,为什么无法被终止掉。ScriptJobHandler将具体执行的逻辑委托给了ScriptUtil执行,看下关键代码:

// ScriptUtil.java
public static int execToFile(String command, String scriptFile, String logFile, String... params) throws IOException {
    // 标准输出:print (null if watchdog timeout)
    // 错误输出:logging + 异常 (still exists if watchdog timeout)
    // 标准输入

    FileOutputStream fileOutputStream = null;   //
    try {
        fileOutputStream = new FileOutputStream(logFile, true);
        PumpStreamHandler streamHandler = new PumpStreamHandler(fileOutputStream, fileOutputStream, null);

        // command
        CommandLine commandline = new CommandLine(command);
        commandline.addArgument(scriptFile);
        if (params!=null && params.length>0) {
            commandline.addArguments(params);
        }

        // exec
        ==========这里是apache的commons-exec提供的功能==========
        DefaultExecutor exec = new DefaultExecutor();
        exec.setExitValues(null);
        exec.setStreamHandler(streamHandler);
        int exitValue = exec.execute(commandline);  // exit code: 0=success, 1=error
        ==========这里是apache的commons-exec提供的功能==========
        return exitValue;
    } catch (Exception e) {
        XxlJobLogger.log(e);
        return -1;
    } finally {
        if (fileOutputStream != null) {
            try {
                fileOutputStream.close();
            } catch (IOException e) {
                XxlJobLogger.log(e);
            }

        }
    }
}

再分析下DefaultExecutor的exec方法:

private int executeInternal(final CommandLine command, final Map<String, String> environment,
                            final File dir, final ExecuteStreamHandler streams) throws IOException {

    setExceptionCaught(null);

    final Process process = this.launch(command, environment, dir);

    try {
        streams.setProcessInputStream(process.getOutputStream());
        streams.setProcessOutputStream(process.getInputStream());
        streams.setProcessErrorStream(process.getErrorStream());
    } catch (final IOException e) {
        process.destroy();
        throw e;
    }

    streams.start();

    try {

        // add the process to the list of those to destroy if the VM exits
        if (this.getProcessDestroyer() != null) {
            this.getProcessDestroyer().add(process);
        }

        // associate the watchdog with the newly created process
        if (watchdog != null) {
            watchdog.start(process);
        }

        int exitValue = Executor.INVALID_EXITVALUE;

        try {
            exitValue = process.waitFor();
            /**
                这里的process是在VmsCommandLancher启动的。
                这里的process.waitFor()有两个作用:
                    1. 接受线程的interrupted,在这里抛出异常;
                    2. 执行的命令缓冲区满,在这里抛出异常;
                所以xxljob停止任务这里会抛出异常
            */
        } catch (final InterruptedException e) {
            /**
                xxljob终止任务会进入这里,调用process方法。
                destroy()方法将终止命令的执行。
            */
            process.destroy();
        }
        finally {
            // see http://bugs.sun.com/view_bug.do?bug_id=6420270
            // see https://issues.apache.org/jira/browse/EXEC-46
            // Process.waitFor should clear interrupt status when throwing InterruptedException
            // but we have to do that manually
            Thread.interrupted();
        }            

        if (watchdog != null) {
            watchdog.stop();
        }

        try {
            streams.stop();
        }
        catch (final IOException e) {
            setExceptionCaught(e);
        }

        closeProcessStreams(process);

        if (getExceptionCaught() != null) {
            throw getExceptionCaught();
        }

        if (watchdog != null) {
            try {
                watchdog.checkException();
            } catch (final IOException e) {
                throw e;
            } catch (final Exception e) {
                throw new IOException(e.getMessage());
            }
        }

        if (this.isFailure(exitValue)) {
            throw new ExecuteException("Process exited with an error: " + exitValue, exitValue);
        }

        return exitValue;
    } finally {
        // remove the process to the list of those to destroy if the VM exits
        if (this.getProcessDestroyer() != null) {
            this.getProcessDestroyer().remove(process);
        }
    }
}

而Process.destroy()的具体实现类在不同平台是不一致的,但最终都是调用native方法: terminateProcess(long handle)。

linux:

JNIEXPORT void JNICALL
Java_java_lang_UNIXProcess_destroyProcess(JNIEnv *env,
                                          jobject junk,
                                          jint pid,
                                          jboolean force)
{
    int sig = (force == JNI_TRUE) ? SIGKILL : SIGTERM;
    kill(pid, sig);
}

windows:

JNIEXPORT void JNICALL
Java_java_lang_ProcessImpl_terminateProcess(JNIEnv *env, jclass ignored, jlong handle)
{
    // 调用Windows API TerminateProcess函数,handle为句柄,1为被终止程序的返回码
    TerminateProcess((HANDLE) handle, 1);
}

这两个API按理说是能正常终止掉程序的,那为什么失败呢。要么是kill、TerminateProcess被hook了,不能正常用,要么就是没权限(可以排除,子进程都是从该Java进程类生的)。

那就只能怀疑PID不正确了,看下启动kettle启动脚本:

#!/bin/bash
#参数传入开始时间
#例 ./f6-purchase-analysis-job.sh " 24351585207698449,10545342555594201203" "2017-08-15 12:00:00"

#common configure
root_dir=/opt/f6
#java home
JAVA_HOME_7="${root_dir}/tools/jdk1.7.0_80"
JAVA_HOME_8="${root_dir}/tools/jdk1.8.0_111"
JAVA_HOME=$JAVA_HOME_8
export JAVA_HOME

#根据环境更改对应的配置
#curtime=`date +%Y-%m-%d`
logpath=/home/f6-purchase-analysis/local-test-web/logs/kettle-`date +%Y-%m-%d`.log
kettle_home=/home/opt/data-integration
job_home=/home/f6-purchase-analysis/local-test-web/

#echo $1 >> $logpath
#if [ ! -n $1 ]; then  
#       $1= $curtime 
#fi
#echo $1 >> $logpath
#echo $curtime >> $logpath

echo '采购分析开始 ... ' >> $logpath
$kettle_home/kitchen.sh -file=$job_home/purchase_analysis_web.kjb -level=Minimal -log=$logpath -param=group_id_set=$1 -param=analysisTime=$2
echo '采购分析结束' >> $logpath

上面27行,调了其他脚本,最终会走到kettle的spoon.sh上,该脚本又会启动一个Java进程。

真相大白,xxljob kill掉的是shell进程,shell启动的Java进程还跑的好好的呢。

为何 Jenkins 可以终止子进程

Jenkins 中大家一定熟悉 DONTKILLME 这个环境变量,默认情况下,Jenkins 会杀死所有派生的进程。那么 Jenkins 是如何实现的呢? Jenkins 在内部维护了一套进程数(ProcessTree.java),杀死父进程的时候,会遍历父进程的所有子进程一一 kill,遍历的方法则是扫描进程的环境变量。

如何获取进程环境变量? Windows -> 通过 winp 封装好的接口,底层是 cpp; Linux -> cat /proc/pid/environ;

总结

  1. Process.destroy()不保证总是成功,注释里写想百分百成功要自己实现子类,涉及到跨平台,麻烦。
  2. 用了xxljob的GLUE(SHELL),就不要指望能终止掉被shell启动的进程了。