msforest / notebook

好记性不如烂笔头,记录知识的点点滴滴。
https://github.com/msforest/notebook/wiki
0 stars 0 forks source link

async2.6.1源码分析之parallel #27

Open msforest opened 6 years ago

msforest commented 6 years ago

分析参数为数组的情况

// entry
function parallel(tasks, callback) {  // parallelLimit -> parallel
    _parallel(eachOf, tasks, callback);
}

function _parallel(eachfn, tasks, callback) {
    callback = callback || noop;
    var results = isArrayLike(tasks) ? [] : {};

    eachfn(tasks, (task, key, taskCb) => {  // task如果是异步的,会被wrapAsync同步化
        wrapAsync(task)((err, ...result) => {
            if (result.length < 2) {
                [result] = result;
            }
            results[key] = result;
            taskCb(err);
        });
    }, err => callback(err, results));
}

function eachOf(coll, iteratee, callback) {
    var eachOfImplementation = isArrayLike(coll) ? eachOfArrayLike : eachOfGeneric;
    eachOfImplementation(coll, wrapAsync(iteratee), callback);
}

function eachOfArrayLike(coll, iteratee, callback) {
    callback = once(callback || noop);
    var index = 0,
        completed = 0,
        {length} = coll,
        canceled = false;  // 表示任务队列是否撤销,嗯,用撤销更好理解
    if (length === 0) {
        callback(null);
    }

    function iteratorCallback(err, value) {
        if (err === false) {
            canceled = true
        }
        if (canceled === true) return   // 退出执行流程,这里的退出不是指执行async.parallel([], callback)里的callback,而是撤销async.parallel()
        if (err) {
            callback(err);  // 若某个task中间有错误,提前退出parallel,已经发起的iteratee的异步任务可能会被执行,但是最后的result只保存了已经执行过的结果
        } else if ((++completed === length) || value === breakLoop) {
            callback(null);
        }
    }

    for (; index < length; index++) {
        iteratee(coll[index], index, onlyOnce(iteratorCallback));   // 重点,只是利用js的异步机制,如果函数没有异步操作,执行还是串行的
    }
}

并行和并发的区别

看过代码之后,才发现不是真正意义上的parallel

parallel功能和Promise.all方法一样

msforest commented 6 years ago

参数为Object类型

主要区别在于eachOf的处理不同

var eachOfGeneric = doLimit(eachOfLimit, Infinity);  // 控制并发的数量

function doLimit(fn, limit) {
    return (iterable, iteratee, cb) => fn(iterable, limit, iteratee, cb)
}

function eachOfLimit(coll, limit, iteratee, callback) {
    _eachOfLimit(limit)(coll, wrapAsync(iteratee), callback);
}

function _eachOfLimit(limit) => {
    return (obj, iteratee, callback) => {
        callback = once(callback || noop);
        if (limit <= 0) {
            throw new RangeError('concurrency limit cannot be less than 1')
        }
        if (!obj) {
            return callback(null);
        }
        var nextElem = _iterator(obj);  // 获取要执行的task
        var done = false;   //  任务队列是否退出
        var canceled = false;   // 如果其中一个err===false,就停止后面的task,已经在执行栈中的task会继续执行,未加入的task不会再执行了,且已经执行的result无效了,因为callback
        var running = 0;    // 并行运行的标志,当达到limit限制的时候,就停下来。当前一个task执行完一个,就执行下一个task,并不是说:limit=3时,就等待三个全部执行完再接着执行下一轮循环的三个
        var looping = false;    // 是否继续再次执行replenish函数,当limit===Infinity,此标志无效

        function iterateeCallback(err, value) {
            if (canceled) return
            running -= 1;   // task执行完,执行栈减1
            if (err) {
                done = true;
                callback(err);
            }
            else if (err === false) {
                done = true;
                canceled = true;
            }
            else if (value === breakLoop || (done && running <= 0)) {
                done = true;
                return callback(null);
            }
            else if (!looping) {
                replenish();
            }
        }

        function replenish () {
            looping = true;
            while (running < limit && !done) {
                var elem = nextElem();
                if (elem === null) {
                    done = true;
                    if (running <= 0) {
                        callback(null);
                    }
                    return;
                }
                running += 1;   // task加入任务队列,执行栈加1
                iteratee(elem.value, elem.key, onlyOnce(iterateeCallback));
            }
            looping = false;
        }

        replenish();
    };
}

// 模拟实现一个iterator遍历器
function _iterator(coll) {
    if (isArrayLike(coll)) {
        return createArrayIterator(coll);
    }

    var iterator = getIterator(coll);
    return iterator ? createES2015Iterator(iterator) : createObjectIterator(coll);
}

function createArrayIterator(coll) {
    var i = -1;
    var len = coll.length;
    return function next() {
        return ++i < len ? {value: coll[i], key: i} : null;
    }
}

function createES2015Iterator(iterator) {
    var i = -1;
    return function next() {
        var item = iterator.next();
        if (item.done)
            return null;
        i++;
        return {value: item.value, key: i};
    }
}

function createObjectIterator(obj) {
    var okeys = obj ? Object.keys(obj) : [];
    var i = -1;
    var len = okeys.length;
    return function next() {
        var key = okeys[++i];
        return i < len ? {value: obj[key], key} : null;
    };
}

function isArrayLike(value) {
    return value &&
        typeof value.length === 'number' &&
        value.length >= 0 &&
        value.length % 1 === 0;
}

function getIterator(coll) {
    return coll[Symbol.iterator] && coll[Symbol.iterator]();    // coll instanceof Set/Map
}