huanzhiyazi / articles

我的原创文章,包括且不限于技术 blog,历史,文学写作,日常心得等
103 stars 32 forks source link

技术 / Java / Executors.newCachedThreadPool如何做到线程缓存的 #8

Open huanzhiyazi opened 4 years ago

huanzhiyazi commented 4 years ago

目录



1 方法定义[Top]

CachedThreadPool 的定义如下:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

定义说明:



2 疑问[Top]

粗一看,这样设置后,工作池似乎不能缓存线程,理由如下:



3 ThreadPoolExecutor.execute() 方法核心代码[Top]

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();

    // a路径:核心线程数为 0,此处不会执行
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // b路径:初始提交任务,SynchronousQueue 入队失败,若有缓存线程正在取任务,则入队成功
    if (isRunning(c) && workQueue.offer(command)) { 
        // offer 的任务如果成功,应该会马上被 getTask() 中的缓存线程 poll() 走,
        // 所以这里的 addWorker() 应该不会被执行到
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }

    // c路径:初始提交任务将在此处生成新的线程执行
    else if (!addWorker(command, false)) 
        reject(command);
}



4 关键点: 线程存活时间 keepAliveTime[Top]

keepAliveTimeSynchronousQueue 缓存任务提供了一个缓冲时间:



5 两个关键方法[Top]

第一个关键方法是工作池中的线程执行任务的方法——runWorker()

final void runWorker(Worker w) {
    // 此处省略初始化代码

    try {
        // 若执行完第一个任务,则通过 getTask() 从队列中取缓存任务,若取到则执行,线程被复用
        while (task != null || (task = getTask()) != null) { 
            // 此处省略任务执行代码
        }
        // 省略
    } finally {
        // 任务队列取不到任务,进入尝试清理阶段
        processWorkerExit(w, completedAbruptly); 
    }
}

第二个关键方法是工作池从任务队列中取缓存任务的方法——getTask()

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        // 省略初始化和线程池状态迁移代码

        int wc = workerCountOf(c);

        // 标记需要超时等待 timed == true
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 

        // 省略工作池满提前退出代码

        try {
            Runnable r = timed ?
                // 任务执行完,60s 等待从 SynchronousQueue 中取任务
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}



6 流程总结[Top]

以下是 CachedThreadPool 运行机制的结构图,可以大概描述其运行流程:

CachedThreadPool structure

这里需要注意的是,如果客户端一次提交的任务太多,则工作池中没有足够的闲置线程来从任务队列中 poll 缓存任务,也就是说,客户端提交的任务里只有一部分可以在任务队列中插入成功,剩下的任务全部直接进入工作池中,并生成新的线程来执行。



7 newCachedThreadPool 适用场景[Top]

CachedThreadPool 适合大量短时间任务并发执行,因为:

同时,CachedThreadPool 不适合长时间执行的任务,因为长任务会长期占用当前线程,当前线程难以被及时派去任务队列取下一个任务,新提交的任务总是无法插入到任务队列,从而不得不总是生成新的线程来执行,线程复用率过低。此时若同时运行的线程数过多,又会增加线程创建和切换开销,严重影响性能。