su37josephxia / frontend-interview

前端面试知识点
MIT License
159 stars 45 forks source link

如何实现一个并发函数 #116

Open su37josephxia opened 2 years ago

RJM1996 commented 2 years ago

JS 实现并发控制

背景

当我们有若干任务需要同时执行, 例如同时发起多个异步请求, 但是系统资源有限, 不能同时处理这么多请求, 所以我们需要控制同时执行任务的数量, 我们把它叫做 并发控制

分析

在这个场景中, 我们有 3 个需要关注的对象:

  1. 待运行任务的列表 tasks
  2. 同时运行任务的列表 pools
  3. 最大并发数(即同时运行任务的个数)max

一般这里的任务都是指异步任务, 我们把每个任务都包装为一个 Promise 对象, 方便获取任务运行结果, 用数组模拟任务列表, 整个流程分为如下几步:

  1. 遍历待运行任务列表 tasks, 将每个任务包装为一个 Promise 对象
  2. 将生成的 Promise 对象存到 pools 中
  3. 当 pools 中的任务数量等于最大并发数 max 时, 就执行 pools 中的任务
  4. 当该 Promise 对象 reslove 时, 就将其从 pools 中删除

实现

// 模拟异步请求
const request = (url) => {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve(`任务 ${url.padEnd(10, ' ')}完成`)
    }, 1000)
  })
}

/**
 * 并发控制函数
 * @param {*} tasks
 * @param {*} max
 */
async function myAsyncPool(tasks = [], max = 3) {
  // 正在执行的任务数组
  let pool = []
  for (let i = 0; i < tasks.length; i++) {
    // 生成异步任务
    const task = request(tasks[i])
    // 添加到正在执行的任务数组
    pool.push(task)
    task.then((data) => {
      // 当任务执行完毕, 将其从正在执行任务数组中移除
      console.log(`${data}; 当前并发数: ${pool.length}`)
      pool.splice(pool.indexOf(task), 1)
    })

    // 当并发池满了, 就先去执行并发池中的任务, 有任务执行完成后, 再继续循环
    if (pool.length === max) {
      await Promise.race(pool)
    }
  }
}
const tasks = new Array(10).fill('').map((task, i) => `url - ${i + 1}`)
myAsyncPool(tasks, 3)

执行结果演示

实际每次是 3 个一组打印出来的

任务 url - 1   完成; 当前并发数: 3
任务 url - 2   完成; 当前并发数: 3
任务 url - 3   完成; 当前并发数: 3
任务 url - 4   完成; 当前并发数: 3
任务 url - 5   完成; 当前并发数: 3
任务 url - 6   完成; 当前并发数: 3
任务 url - 7   完成; 当前并发数: 3
任务 url - 8   完成; 当前并发数: 3
任务 url - 9   完成; 当前并发数: 2
任务 url - 10  完成; 当前并发数: 1

参考资源

一般我们都会用一些现成的库来进行并发的控制, 例如: async-pool

Limeijuan commented 2 years ago
/**
 * @params list {Array} - 要迭代的数组
 * @params limit {Number} - 并发数量控制数
 * @params asyncHandle {Function} - 对`list`的每一个项的处理函数,参数为当前处理项,必须 return 一个Promise来确定是否继续进行迭代
 * @return {Promise} - 返回一个 Promise 值来确认所有数据是否迭代完成
 */
let mapLimit = (list, limit, asyncHandle) => {
    let recursion = (arr) => {
        return asyncHandle(arr.shift()).then(()=>{
                if (arr.length!==0) return recursion(arr)   // 数组还未迭代完,递归继续进行迭代
                else return 'finish';
            })
    };

    let listCopy = [].concat(list);
    let asyncList = []; // 正在进行的所有并发异步操作
    while(limit--) {
        asyncList.push( recursion(listCopy) ); 
    }
    return Promise.all(asyncList);  // 所有并发异步操作都完成后,本次并发控制迭代完成
}

如何实现一个并发函数?

首先要考虑的是:

JanusJiang1 commented 2 years ago
async function asyncPool(poolLimit, array, iteratorFn) {
  const ret = []; // 存储所有的异步任务
  const executing = []; // 存储正在执行的异步任务
  for (const item of array) {
    // 调用iteratorFn函数创建异步任务
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p); // 保存新的异步任务
    // 当poolLimit值小于或等于总任务个数时,进行并发控制
    if (poolLimit <= array.length) {
      // 当任务完成后,从正在执行的任务数组中移除已完成的任务
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e); // 保存正在执行的异步任务
      if (executing.length >= poolLimit) {
        await Promise.race(executing); // 等待较快的任务执行完成
      }
    }
  }
  return Promise.all(ret);
}
chunhuigao commented 2 years ago
Promise.all = function (promises) {
  let count = 0
  let len = promises.length
  let result = []
  return new Promise((resolve, reject) => {
    for (let i = 0; i < len; i++) {
      Promise.resolve(promises[i])
        .then((value) => {
          result[i] = value
          count++
          if (count === len) resolve(result)
        })
        .catch((e) => {
          reject(e)
        })
    }
  })
}
wzl624 commented 2 years ago

`async function asyncPool(poolLimit, array, iteratorFn) { const ret = []; const executing = []; for (const item of array) { const p = Promise.resolve().then(() => iteratorFn(item, array)); ret.push(p);

if (poolLimit <= array.length) {
  const e = p.then(() => executing.splice(executing.indexOf(e), 1));
  executing.push(e);
  if (executing.length >= poolLimit) {
    await Promise.race(executing);
  }
}

} return Promise.all(ret); }`

zcma11 commented 2 years ago

JavaScript 实现并发函数是使用了异步编程的方法,普通回调,generator,Promise 都可以实现。Promise版的思路是统计正在等待的 promise 数量,达到限制的最大并发数的时候,会进入暂停阶段,直到腾出一个位置才继续执行。

如果用计数法的话需要用一个定时器,让并发的任务有空隙执行。如果使用 Promise.race 则可以完全停止下来,不会继续走循环,性能可能更好。

async function asyFn(fun, max = 4, total = 200) {
  let completed = 0
  let working = []
  let result = []
  console.time('耗时')
  for (let i = 1; i <= total; i++) {
    const execute = Promise.resolve().then(async () => {
      const res = await fun()
      result.push(res)
      completed++
      console.log('并发:' + working.length, completed + '/' + total)
      // 成功后自己清除自己
      const j = working.findIndex(f => f === execute)
      j > -1 && working.splice(j, 1)
    }).catch(() => {
      // 失败也删除自己
      result.push(null)
      const j = working.findIndex(f => f === execute)
      j > -1 && working.splice(j, 1)
    })
    working.push(execute)
    // 进入等待
    if (working.length === max) {
      await Promise.race(working)
    }
  }
  console.timeEnd('耗时')
  // 等等最后的几个,返回返回值数组
  return Promise.all(working).then(() => result)
}

// 返回了每次的返回值,可能不需要
(async () => {
  const r = await asyFn(test, 4, 20)
  console.log(r)
})()
jj56313751 commented 2 years ago
rachern commented 2 years ago

并发函数要求同一时间任务队列中最多只能有 n 个任务在执行,如果超过了 n 个任务,那么后面的任务就得排队等待,等到执行中的任务有一个以上完成了,退出了任务队列,等待中的任务才能够进入任务队列去执行

const asyncFun => async (fun, maxNum = 4, sum = 200) {
  let num = 0
  let curNum = 0
  const tasks = []
  while (num < sum) {
    if (curNum < maxNum) {
      tasks.push(new Promise(async resolve => {
        res = await fun()
        curNum--
        resolve(res)
      })) else {
        sleep(10)
      }
    }
  }
}
crazyyoung1020 commented 2 years ago

可以利用promise实现一个并发任务函数。

一个并发任务函数一般至少有三个参数

  1. 最大并发量
  2. 任务列表
  3. 执行任务的方法

我们要保证的是,开始执行任务后,同时执行任务的数量不能超过最大并发数,并且最终将任务执行完。

那么我们可以利用promise来实现一个并发函数,大概逻辑如下

  1. 开始执行任务后,将每个任务包装成一个promise对象,并放入并发数组pool中和结果数组res中。
  2. 每个任务完成的时候,我们在promise的then回调里将当前任务从pool数组中删除
  3. 如果当前pool任务并发数不超过并发量限制,则继续下一个任务,如果达到限制了,则通过promise.race来等待一会,等任务池中的最快的任务执行完,腾出一个位置了,我们再继续去执行下一个任务。
  4. 当将所有任务都执行完后,我们通过promise.all去将所有任务的结果拿到。

代码如下

function asyncPool(limit, tasks, handleFun) {
  let i = 0;
  // 结果数组
  const res = [];
  // 并发任务数组
  const pool = [];
  const recursion = function() {
    // 计数,到最后一个任务了则停止递归
    if (i === tasks.length) {
      return Promise.resolve();
    }
    const item = tasks[i++];
    // 执行当前任务,并将promise放入结果数组里
    const p = Promise.resolve().then(() => handleFun(item));
    res.push(p);

    let r = Promise.resolve();

    // 如果并发量小于任务数,才需要做并发控制
    if (limit <= tasks.length) {
      // 任务结束后,将任务从并发池中删除
      const e = p.then(() => pool.splice(pool.indexOf(e), 1));
      // 将任务方法并发池
      pool.push(e);
      if (pool.length >= limit) {
        // 如果并发池达到并发上限了,停止任务执行,等并发池内的有任务结束才继续
        r = Promise.race(pool);
      }
    }

    return r.then(() => recursion());
  };
  return recursion().then(() => Promise.all(res));
}