fxleyu / cu-cafes

This is a repository of Java. And CU is a cafes unders downstairs.
2 stars 0 forks source link

[基础] 熟悉 ThreadPoolExecutor 底层实现 #60

Open fxleyu opened 6 years ago

fxleyu commented 6 years ago

背景

今晚组内有次线程池内容分享,收获很多,但也有很多不明白的地方。之前看过这块代码,但看的很糙。综合一下,感觉自己接触到了线程池的本质(一个生产者-消费者模型)。借此机会,希望系统深入地理解线程池的实现。

目录

fxleyu commented 6 years ago

JDK 1.6

来自于 jdk 1.6.0_45

1 执行任务

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // poolSize 当前存在的线程数
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            // addIfUnderCorePoolSize 失败,添加到阻塞队列 workQueue 中
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }

2 当执行的线程数小于核心线程数

    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }

3 阻塞队列被填满后

    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }

4 线程池中线程的执行

        public void run() {
            try {
                hasRun = true;
                Runnable task = firstTask;
                firstTask = null;
                // getTask 是从任务队列(阻塞队列)中获取任务
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }

5 实际任务执行

        private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                /*
                 * If pool is stopping ensure thread is interrupted;
                 * if not, ensure thread is not interrupted. This requires
                 * a double-check of state in case the interrupt was
                 * cleared concurrently with a shutdownNow -- if so,
                 * the interrupt is re-enabled.
                 */
                if ((runState >= STOP ||
                    (Thread.interrupted() && runState >= STOP)) &&
                    hasRun)
                    thread.interrupt();
                /*
                 * Track execution state to ensure that afterExecute
                 * is called only if task completed or threw
                 * exception. Otherwise, the caught runtime exception
                 * will have been thrown by afterExecute itself, in
                 * which case we don't want to call it again.
                 */
                boolean ran = false;
                beforeExecute(thread, task);
                try {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }

6 当任务执行出现异常后

    void workerDone(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
            if (--poolSize == 0)
                tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }