wangjs-jacky / js-challenges

0 stars 0 forks source link

实现 Promise 队列的并发操作,支持控制并发个数 #7

Open wangjs-jacky opened 11 months ago

wangjs-jacky commented 11 months ago

测试代码:

function sleep(text, delay = 1000) {
  return () => new Promise(resolve => {
    setTimeout(() => {
      console.log(text)
      resolve();
    }, delay);
  });
}

const tasks = [1, 2, 3, 4, 5].map((i) => {
  return sleep(i)
})

测试代码:asyncPool(tasks, 2)

预期结果:

wangjs-jacky commented 11 months ago

维护两个数组:

  1. allTasks
  2. poolTasks

取出 tasks[i] 作为当前的任务:

allTaskstask[i] poolTaskse = task[i].then(()=>{poolTasks.splice(poolTasks.indexOf(e),1)})

通过 Promise.race + await 控制 poolTasks 始终在并发数范围内,最后通过 Promise.all 实现所有任务的并发。

const asyncPool = async (tasks, poolLimit) => {
  /* 所有异步任务执行状态 */
  const allTasks = [];
  /* 正在执行的任务数组 */
  const poolTasks = [];

  for (let i = 0; i < tasks.length; i++) {
    const curTask = Promise.resolve(tasks[i]());
    allTasks.push(curTask);

    /* 当 poolLimit <= tasks.length 时,实现并发控制 */
    if (poolLimit <= tasks.length) {
      /* 在原有异步包裹处理操作 */
      const e = curTask.then(() => {
        /* 成功后,从正在执行的任务数组中删除 */
        poolTasks.splice(poolTasks.indexOf(e), 1)
      })
      poolTasks.push(e);

      /* poolTasks 持续增加会超出限制数量 */
      if (poolTasks.length >= poolLimit) {
        /* 始终控制 poolTasks 的数量 */
        await Promise.race(poolTasks);
      }
    }
  }

  /* 此时 allTasks 中剩余 pending < poolTasks */
  return Promise.all(allTasks);
}

asyncPool(tasks, 2)

参考实现:asyncPool 的使用

wangjs-jacky commented 11 months ago

使用 koa 内的 compose 函数思想,构建 dispatch 函数。

对于同步任务的递归为:

const looptask = () => {
  looptask()
}

对于异步任务的递归:

const dispatch = () => {
  asyncFn().then(() => {
    dispatch();
  })
}

基于这个思想,结合 Promise.all 的源码实现。

/* 使用 dispatch 实现 */
const asyncPool = (tasks, poolLimit) => {
  return new Promise((resolve, reject) => {
    const result = [];
    let resolveCount = 0;
    let currentIndex = 0;

    const dispatch = () => {
      const curTask = Promise.resolve(tasks[currentIndex]());
      const index = currentIndex;
      currentIndex++;
      /* 异步任务的递归,通过 .then 实现 */
      curTask.then(res => {
        result[index] = res;
        resolveCount++;
        if (resolveCount === tasks.length) {
          resolve(res);
        }

        /* 递归的触发(currentIndex指针还未触发) */
        if (currentIndex < tasks.length) {
          dispatch();
        }
      });
    }

    for (let i = 0; i < poolLimit && i < tasks.length; i++) {
      dispatch();
    }
  })
}

asyncPool(tasks, 2)