Bpazy / blog

我的博客,欢迎关注和讨论
https://github.com/Bpazy/blog/issues
MIT License
39 stars 2 forks source link

XxlJob 无法 restart #267

Open Bpazy opened 1 year ago

Bpazy commented 1 year ago

概要

接入 Apollo 后,准备把 XxlJob 的启停也接入配置中心,和 MQ 保持一致,这样就可以在配置中心统一管理 MQ, XxlJob 的流量了。但是在接入的过程中,发现了问题,表现为 XxlJob destroy 再 start 后,XxlJob 控制台看不到重新启动的执行器。

本文旨在分析原因,以及提供解决方案。

原因分析

首先看一个启动 XxlJob 执行器的案例:

@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor() {
    XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
    xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
    xxlJobSpringExecutor.setAppName(appName);
    xxlJobSpringExecutor.setIp(ip);
    xxlJobSpringExecutor.setPort(port);
    xxlJobSpringExecutor.setLogPath(logPath);
    xxlJobSpringExecutor.setAccessToken(accessToken);
    xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

    return xxlJobSpringExecutor;
}

这一步做了哪些事情?初始化了一个 XxlJobSpringExecutor,并将 XxlJobSpringExecutor 交给 Spring 管理,Bean 初始化完毕后,会调用 start 方法,Bean 销毁时,会调用 destroy 方法。

那么 XxlJobSpringExecutor.start 方法做了什么?

@Override
public void start() throws Exception {

    // init JobHandler Repository
    initJobHandlerRepository(applicationContext);

    // refresh GlueFactory
    GlueFactory.refreshInstance(1);

    // super start
    super.start();
}
  1. 通过 Spring ApplicationContext 获取 JobHandler;
  2. 设置 GLUE 参数;
  3. 调用父类 start 方法;

前两点没有疑问,再分析下父类 start 方法:

public void start() throws Exception {

    // init logpath
    XxlJobFileAppender.initLogPath(logPath);

    // init invoker, admin-client
    initAdminBizList(adminAddresses, accessToken);

    // init JobLogFileCleanThread
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // init TriggerCallbackThread
    TriggerCallbackThread.getInstance().start();

    // init executor-server
    port = port>0?port: NetUtil.findAvailablePort(9999);
    ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
    initRpcProvider(ip, port, appName, accessToken);
}

做了哪些事?

  1. 初始化 Log Path;
  2. 初始化 Admin 连接信息;
  3. 初始化 JobLogFileCleanThread 和 TriggerCallbackThread,注意这里的 getInstance 方法,意味着这两个类是全局唯一实例;
  4. 初始化 XxlRPC;

再看 TriggerCallbackThread.start 方法:

public void start() {

    // valid
    if (XxlJobExecutor.getAdminBizList() == null) {
        logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
        return;
    }

    // callback
    triggerCallbackThread = new Thread(new Runnable() {

        @Override
        public void run() {

            // normal callback
            while(!toStop){
                try {
                    HandleCallbackParam callback = getInstance().callBackQueue.take();
                    if (callback != null) {

                        // callback list param
                        List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                        int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                        callbackParamList.add(callback);

                        // callback, will retry if error
                        if (callbackParamList!=null && callbackParamList.size()>0) {
                            doCallback(callbackParamList);
                        }
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }

            // last callback
            try {
                List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                if (callbackParamList!=null && callbackParamList.size()>0) {
                    doCallback(callbackParamList);
                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");

        }
    });
    triggerCallbackThread.setDaemon(true);
    triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
    triggerCallbackThread.start();

    // retry
    triggerRetryCallbackThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while(!toStop){
                try {
                    retryFailCallbackFile();
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }

                }
                try {
                    TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
        }
    });
    triggerRetryCallbackThread.setDaemon(true);
    triggerRetryCallbackThread.start();

}

可以看到 TiggerCallbackThread.start 方法初始化了 2 个 Thread: triggerCallbackThread, triggerRetryCallbackThread,并且内部都有一个判断 TiggerCallbackThread.toStop 属性的方法,这是非常大的疑点了,只要 toStop 为 true,那么这两个名称为“触发执行器”的线程就会关闭。

image

toStop() 方法,将 toStop 属性设置为 true,toStop() 方法又被 XxlJobExecutor.destroy() 方法调用:

public void destroy(){
    // destory jobThreadRepository
    if (jobThreadRepository.size() > 0) {
        for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {
            removeJobThread(item.getKey(), "web container destroy and kill the job.");
        }
        jobThreadRepository.clear();
    }
    jobHandlerRepository.clear();

    // destory JobLogFileCleanThread
    JobLogFileCleanThread.getInstance().toStop();

    // destory TriggerCallbackThread
    TriggerCallbackThread.getInstance().toStop();

    // destory executor-server
    stopRpcProvider();

    // destory invoker
    stopInvokerFactory();
}

到这里原因已经分析结束。那怎么解决该问题?分析 destory() 除了设置 toStop 属性还干了哪些 start 不能重置的属性,记得刚才看到的 getInstance 吗,这里可以大胆的猜测:

  1. 只有 JobLogFileCleanThread, TriggerCallbackThread, ExecutorRegistryThread 这 3 个单实例类属性未被重置;
  2. 全局搜索 toStop 属性,涉及到的类都需要重置;

不讲武德的将 com.xxl.job.core.thread 包下的 ExecutorRegistryThread, JobLogFileCleanThread, TriggerCallbackThread 三个类复制到工程代码中,用于覆盖 jar 代码: image

分别将三个 Thread 中的 start 方法第一行增加 this.toStop = false; 。

再次启动应用测试: 在配置中心里关闭再开启 XxlJob: image

查看 XxlJob 控制台执行器 IP 地址: image

执行调度任务,查看调度日志: image image

证实可以正常调度到应用上。

但是我们应该这样做吗,不应该,为什么?

  1. XxlJob 官方未提供 restart 这种功能,自己实现有风险;

  2. 复制类并修改代码属于奇技淫巧;

    总结

  3. 配置中心不要接入复杂逻辑;

  4. 利用工具自带的控制台达到目的,我们可以用 xxljob 控制台关闭特定机器流量;

  5. 写代码一定要控制自己,少用单例,少用单例,少用单例。Spring 天生就和单例八字不合。