GuanceCloud / dd-trace-java

Datadog APM client for Java
https://docs.datadoghq.com/tracing/languages/java
Apache License 2.0
9 stars 3 forks source link

xxl-job 对JobThread 进行链路传递增强处理 #75

Closed lrwh closed 2 months ago

lrwh commented 3 months ago

JobThread 的执行流程,线程 start 后,会执行 run 函数,run 函数内嵌 while 死循环,且循环体内自带了一个队列 triggerQueue 用于接收执行任务,根据执行任务再进行下一步动作。基本上没有合适的切入点进行埋点。 以下是 JobThread 部分代码

    public JobThread(int jobId, IJobHandler handler) {
        ......
    }

    public void run() {
        System.out.println("do run start .....");
        // init
        try {
            handler.init();
        } catch (Throwable e) {
            logger.error(e.getMessage(), e);
        }

        // execute
        while(!toStop){
            running = false;
            idleTimes++;

            TriggerParam triggerParam = null;
            try {
                // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
                triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
                if (triggerParam!=null) {
                    running = true;
                    idleTimes = 0;

                    XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
                    ...
                    handler.execute();
                    ...

                } else {
                    if (idleTimes > 30) {
                        if(triggerQueue.size() == 0) {  // avoid concurrent trigger causes jobId-lost
                            XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                        }
                    }
                }
            } catch (Throwable e) {
                if (toStop) {
                    XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
                }
                XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
            } finally {
               ...
            }
        }
    }

埋点尝试

对 JobThread 的 构造函数run 函数进行增强。这种方式确实能够实现链路的透传。具体流程如下:

  1. JobThread构造函数构造了初始线程,可以将当前上下文的 span 信息透传进来。

  2. run 方法启动后,整个代码在同一个线程当中。也就意味着从 run 方法开始到结束,只能存在一个 span 信息,也就说明 traceid 这个时候是固定的。

  3. run 方法内部存在while循环,且存在任务队列 triggerQueue,并进行消费,任务来自于 admin 的下发。构造函数run 本质上是处于同一个线程当中,也就意味着首次消费任务,会把 JobThread 的构造函数的 span 信息作为当前的依据。

看似基本上实现了链路的串联,但仍有比较严重的问题:

  1. 上面已经说过了,run 方法从开始到结束,代码都在同一个线程内执行。

  2. 对于同一个任务,admin 在下发的时候存在不同的策略,比如定时执行,如果当前任务尚未结束,下次执行则会加入到队列当中,然后被 run 方法进行消费。这个时候存在 admin 的 trace 信息与 run 的上下文不匹配。会被 run 作为原始第一次的链路衔接。而后续的调用使用的是新的traceid,无法纳入同一个链路上。

结论

lrwh commented 2 months ago

无解