fedono / fe-questions

1 stars 0 forks source link

19. 实现Promise.all/race 实现并发控制asyncLimit #20

Open fedono opened 4 years ago

fedono commented 4 years ago

考察Promise 一般都会让你说 Promise 的原理,比如如何实现链式调用,then 是如何实现的,参考下实现 Promise 需要说明的是,如果你说 then 是再new 一个promise ,这个是自己实现的then,不是原生的 promise实现的then

实现 ALL

function promiseAll(promises) {
    return new Promise(function(resolve, reject) {
        if (!Array.isArray(promises)) {
            throw new TypeError('argument must be a array');
        }

        let resolvedCounter = 0;
        let promiseNum = promises.length;
        let resolvedResult = [];

        for (let i = 0; i < promiseNum; i++) {
            Promise.resolve(promises[i]).then(value => {
                resolvedCounter++;
                resolvedResult[i] = value;
                if (resolvedCounter === promiseNum) {
                    return resolve(resolvedResult);
                }
            }, error => {
                return reject(error);
            })
        }
    })
}

测试:

let p1 = new Promise((resolve, reject) => {
    setTimeout(() => {
        resolve(1)
    }, 1000)
});

let p2 = new Promise((resolve, reject) => {
    setTimeout(() => {
        resolve(2)
    }, 1000)
});

let p3 = new Promise((resolve, reject) => {
    setTimeout(() => {
        resolve(3)
    }, 1000)
});

promiseAll([p3, p1, p2]).then(res => {
    console.log(res)
})
fedono commented 3 years ago

拓展题 使用 Promise 实现并发控制

// 实现并发控制3个
// 原理就是使用 Promise.race 来实现当前这个执行完成了,就可以将下一个推进数组中去
function limitLoad(urls, handler, limit) {
    const sequence = [].concat(urls);
    let promises = [];

    // 这里有一个非常关键的点,就是这里的 return index
    // 这个index来控制当前是第几个请求,来保证队列里面当 index 执行完成之后,
    // 就把新的请求放到第 index 个中去
    promises = sequence.splice(0, limit).map((url, index) => {
        return handler(url).then(() => {
            return index;
        });
    });

    // 核心关键点就是这个 race,来检测其中的请求已经完成了
    let p = Promise.race(promises);
    for(let i = 0; i < sequence.length; i++) {
        p = p.then(res => {
            // 这里的 res 就是上面的 index,每次添加都是添加到已完成的请求占据的队列中的位置
            promises[res] = handler(sequence[i]).then(() => {
                return res;
            });
            return Promise.race(promises);
        });
    }
}

测试


const urls = Array.from({length: 9}).map((v, i) => {
    return {
        info: 'url ' + i,
        time: 500 * i
    }
})

function loadImg(url) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            console.log(url.info + '--- url info');
            resolve()
        }, url.time);
    })
}

limitLoad(urls, loadImg, 3); // 每次都是三个请求
fedono commented 3 years ago

上面那个是在实际工作中可以用到的,有点不太好理解,所以这里简化一下

function asyncLimit(requests, limit) {
    let requestLimits = requests.splice(0, limit).map((request, i) => {
        // 在 map 里面,每个请求里面的 then 都要返回一个 i
        // 用来在 race 执行完成之后,添加过来的知道如何替换掉已经执行过的请求
        return request().then(() => i);
    });

    let p = Promise.race(requestLimits);
    for (let i = 0; i < requests.length; i++) {
        // 主要是没有明白这里的 p = p.then 是什么意思?
        // 每次都需要更新一次 p,要不然每次都是第一次的 Promise.race(requestLimits) ?
        // 如果这里只是 p.then 而不是 p = p.then 那么 requestLimits中第一次会输出一次结果,然后 requests 中的结果会一次性全输出,总共输出两次
        // 每次都要 p = p.then 应该是每次都要更新p,因为每次在 race 中都有一个函数执行完成了,所以需要更新一下
        p = p.then(index => {
        // p.then(index => {
            // 一定要在这里加上 .then(() => index) 这样才能把 index 传给下一个函数
            requestLimits[index] = requests[i]().then(() => index);
            // 每次 race 中的函数执行完成后,都是返回 index,所以这里要 return Promise.race,来让下一个 then 中接受的参数是 index
            return Promise.race(requestLimits);
        });
    }
}

测试


createPromise = (n, i) => () => {
    return new Promise(resolve => {
        setTimeout(() => {
            console.log(i);
            resolve(n);
        }, n * 1000);
    })
}

createMultiplePromise = n => {
    let res = [];
    Array.from({length: n}).map((_, i) => {
        res.push(createPromise(1, i));
    });
    return res;
};

let promises = createMultiplePromise(8);
asyncLimit(promises, 3);
fedono commented 1 year ago

来个更简单的

const asyncLimit = async (queue, concurrency) => {
  let index = 0;
  const results = []

  const execThread = async () => {
    while (index < queue.length) {
      // Use of `curIndex` is important because `index` may change after await is resolved
      const curIndex = index++;
      // 这里的重点就是,只有在 await 执行完成后,才会进入下一个 while 循环,
      // 所以下面这个 for 渲染,有  concurrency 个并行就行
      results[curIndex] = await queue[curIndex]()
    }
  }

  const threads = []
  for (let thread = 0; thread < concurrency; thread++) {
    threads.push(execThread())
  }

  await Promise.all(threads)

  return results
}

参考 Promise All with Limit of Concurrent N

fedono commented 1 year ago

再来个简单的 async limit

async function limit(tasks, concurrency) {
  const results = [];

  async function runTasks(tasksIterator) {
    for (const [index, task] of tasksIterator) {
      try {
        results[index] = await task();
      } catch (error) {
        results[index] = new Error(`Failed with: ${error.message}`);
      }
    }
  }

  const workers = new Array(concurrency).fill(tasks.entries()).map(runTasks);

  await Promise.allSettled(workers);

  return results;
}

const timeout = (time, i) =>
  new Promise((resolve) => {
    setTimeout(() => {
      resolve(i);
      console.log(i, '--- time');
    }, time * 1000);
  });

async function main() {
  const tasks = [
    () => timeout(1, 1),
    () => timeout(1, 2),
    () => timeout(2, 3),
    () => timeout(1, 4),
    () => timeout(1, 5),
    () => timeout(1, 6)
    // ...
  ];

  const results = await limit(tasks, 3);
}

main();

来源 - parallel-tasks-with-pure-javascript