msforest / notebook

好记性不如烂笔头,记录知识的点点滴滴。
https://github.com/msforest/notebook/wiki
0 stars 0 forks source link

async2.6.1源码分析之auto #29

Open msforest opened 6 years ago

msforest commented 6 years ago

两个知识点:

  1. 拓扑排序算法
  2. 生产者消费者模式

代码中都有指出

function auto(tasks, concurrency, callback) {
    if (typeof concurrency === 'function') {
        // concurrency is optional, shift the args.
        callback = concurrency;
        concurrency = null;
    }
    callback = once(callback || noop);
    var numTasks = Object.keys(tasks).length;
    if (!numTasks) {
        return callback(null);
    }
    if (!concurrency) {
        concurrency = numTasks;
    }

    var results = {};   // 保存task的结果
    var runningTasks = 0;   // 用于标志队列是否执行完毕或是否还有剩余队列可执行
    var canceled = false;   // 是否撤销async调用
    var hasError = false;   // 是否提前退出调用

    var listeners = Object.create(null);

    var readyTasks = [];

    // for cycle detection:
    var readyToCheck = []; // tasks that have been identified as reachable
    // without the possibility of returning to an ancestor task
    var uncheckedDependencies = {};

    // 1。为拓扑排序检查做数据准备
    Object.keys(tasks).forEach(key => {
        var task = tasks[key]
        if (!Array.isArray(task)) {
            // no dependencies
            enqueueTask(key, [task]); // 度为0
            readyToCheck.push(key);
            return;
        }

        var dependencies = task.slice(0, task.length - 1);
        var remainingDependencies = dependencies.length;
        if (remainingDependencies === 0) {
            enqueueTask(key, task); // 度为0
            readyToCheck.push(key);
            return;
        }
        uncheckedDependencies[key] = remainingDependencies;

        dependencies.forEach(dependencyName => {
            if (!tasks[dependencyName]) {
                throw new Error('async.auto task `' + key +
                    '` has a non-existent dependency `' +
                    dependencyName + '` in ' +
                    dependencies.join(', '));
            }
            addListener(dependencyName, () => {
                remainingDependencies--;
                if (remainingDependencies === 0) {
                    enqueueTask(key, task); // 度不为0的key,只是依赖的task全部执行完才加入到最后的任务队列中
                }
            });
        });
    });

    checkForDeadlocks();    // 2. 检查依赖关系是否存在循环♻️
    processQueue(); // 开始执行队列

    // 把入度为0的点加入队列
    function enqueueTask(key, task) {
        readyTasks.push(() => runTask(key, task));
    }

    function processQueue() {
        if (canceled) return    // 撤销async.auto()程序
        if (readyTasks.length === 0 && runningTasks === 0) {
            return callback(null, results);
        }
        // 优先执行度为0的点
        while(readyTasks.length && runningTasks < concurrency) { // 设置并发限制,等待前一轮的某一个task执行结束再增加下一个task
            var run = readyTasks.shift();
            run();  // 执行runTask
        }

    }

    // 相当于生产者,根据taskName往池子(listeners)里生产东西
    function addListener(taskName, fn) {
        var taskListeners = listeners[taskName];
        if (!taskListeners) {
            taskListeners = listeners[taskName] = [];
        }

        taskListeners.push(fn);
    }

    function taskComplete(taskName) {
        var taskListeners = listeners[taskName] || [];
        taskListeners.forEach(fn => fn());  //类似于消费者,根据taskName获取池子里的东西,使得每一个依赖taskName、度不为0的点减1,直至为0
        processQueue();
    }

    function runTask(key, task) {
        if (hasError) return;

        var taskCallback = onlyOnce((err, ...result) => {
            runningTasks--; // 执行队列-1
            if (err === false) {
                canceled = true
                return
            }
            if (result.length < 2) {
                [result] = result;
            }
            if (err) {
                var safeResults = {};
                Object.keys(results).forEach(rkey => {
                    safeResults[rkey] = results[rkey];
                });
                safeResults[key] = result;
                hasError = true;
                listeners = Object.create(null);
                if (canceled) return
                callback(err, safeResults);
            } else {
                results[key] = result;
                taskComplete(key);
            }
        });

        runningTasks++; // 执行队列+1
        var taskFn = wrapAsync(task[task.length - 1]);
        if (task.length > 1) {
            taskFn(results, taskCallback);
        } else {
            taskFn(taskCallback);
        }
    }

    /**
     * 拓扑排序
     * // https://acm.sjtu.edu.cn/w/images/4/4e/%E6%8B%93%E6%89%91%E6%8E%92%E5%BA%8F.pdf
     * 把入度为0的点放入队列
     * while(队列非空){
     *      now = 弹出对头
     *      for(遍历now的所有出边){
     *          to = 出边
     *          du[to]--;
     *          if(!du[to]) to入队
     *      }
     *      // now出列,放入结果序列
     * }
     *
     * wiki:
     * L 表示排好序的队列
     * S 表示度为0的边
     * while(S非空){
     *      N = S.shift()
     *      L.push(N);
     *      for(遍历N的出边){
     *          m = N指向的点
     *          从graph移除指向m的边
     *          if(m度为0) S.push(m)
     *      }
     * }
     *
     * if(graph还有边) return error(至少有一个循环♻️)
     * else return L (拓扑排序的结果)
     */
    function checkForDeadlocks() {
        // Kahn's algorithm
        // https://en.wikipedia.org/wiki/Topological_sorting#Kahn.27s_algorithm
        // http://connalle.blogspot.com/2013/10/topological-sortingkahn-algorithm.html
        var currentTask;
        var counter = 0;
        while (readyToCheck.length) {
            currentTask = readyToCheck.pop();
            counter++;
            getDependents(currentTask).forEach(dependent => {
                if (--uncheckedDependencies[dependent] === 0) {
                    readyToCheck.push(dependent);
                }
            });
        }

        if (counter !== numTasks) { // 若队列里的个数不等于人物队列的个数,则判定为存在循环依赖♻️
            throw new Error(
                'async.auto cannot execute tasks due to a recursive dependency'
            );
        }
    }

    // 获取度为0指向的出边数据
    function getDependents(taskName) {
        var result = [];
        Object.keys(tasks).forEach(key => {
            const task = tasks[key]
            if (Array.isArray(task) && task.indexOf(taskName) >= 0) {
                result.push(key);
            }
        });
        return result;
    }
}