Open alian926 opened 3 years ago
// 获取 最小值~最大数之间的随机整数, 闭区间
const randomInteger = (minimum, maximum) =>
Math.floor(Math.random() * (maximum - minimum + 1) + minimum);
const createAbortError = () => {
const error = new Error('Delay aborted');
error.name = 'AbortError';
return error;
};
const createDelay =
// willResolve:返回resolve还是reject的结果
({ clearTimeout: defaultClear, setTimeout: set, willResolve }) =>
// ms延迟时间, value作为resolve/reject的参数, signal: AbortSignal对象
(ms, { value, signal } = {}) => {
if (signal && signal.aborted) {
return Promise.reject(createAbortError());
}
let timeoutId;
let settle;
let rejectFn;
const clear = defaultClear || clearTimeout;
const signalListener = () => {
clear(timeoutId);
rejectFn(createAbortError());
};
const cleanup = () => {
if (signal) {
signal.removeEventListener('abort', signalListener);
}
};
const delayPromise = new Promise((resolve, reject) => {
settle = () => {
cleanup();
if (willResolve) {
resolve(value);
} else {
reject(value);
}
};
rejectFn = reject;
timeoutId = (set || setTimeout)(settle, ms);
});
if (signal) {
signal.addEventListener('abort', signalListener, { once: true });
}
// 清除延时, 直接返回直接结果
delayPromise.clear = () => {
clear(timeoutId);
timeoutId = null;
settle();
};
return delayPromise;
};
// clearAndSet 自定义的 clearTimeout, setTimeout
const createWithTimers = clearAndSet => {
// 返回一个reolved
const delay = createDelay({...clearAndSet, willResolve: true});
// 返回一个rejected
delay.reject = createDelay({...clearAndSet, willResolve: false});
// 在特定时间内返回一个resolved
delay.range = (minimum, maximum, options) => delay(randomInteger(minimum, maximum), options);
return delay;
}
const delay = createWithTimers();
delay.createWithTimers = createWithTimers;
export default delay;
function isEmptyIterable(iterable) {
for (const _ of iterable) {
return false;
}
return true;
}
export default isEmptyIterable;
const pMap = async (
iterable,
mapper,
{ concurrency = Number.POSITIVE_INFINITY, stopOnError = true } = {}
) => {
return new Promise((resolve, reject) => {
// 参数校验
// mapper必须是函数
if (typeof mapper !== 'function') {
throw new TypeError('Mapper function is required');
}
// concurrency 必须是>=1的安全整数
if (
!(
(Number.isSafeInteger(concurrency) ||
concurrency === Number.POSITIVE_INFINITY) &&
concurrency >= 1
)
) {
throw new TypeError(
`Expected \`concurrency\` to be an integer from 1 and up or \`Infinity\`, got \`${concurrency}\` (${typeof concurrency})`
);
}
// 存储返回结果
const result = [];
// 存储异常对象
const errors = [];
// 保存跳过项索引值的数组
const skippedIndexes = [];
// 获取迭代器
const iterator = iterable[Symbol.iterator]();
// 标识是否出现异常
let isRejected = false;
// 标识是否已经迭代完成
let isIterableDone = false;
// 正在处理的任务个数
let resolvingCount = 0;
// 当前索引
let currentIndex = 0;
const next = () => {
// 如果有异常,直接返回
if (isRejected) {
return;
}
// 获取下一项 迭代内容
const nextItem = iterator.next();
// 记录当前索引
const index = currentIndex;
currentIndex++;
// 判断迭代器是否迭代完成
if (nextItem.done) {
isIterableDone = true;
// 判断所有的任务都已经完成了,没有正在处理的任务了
if (resolvingCount === 0) {
// 异常处理
if (!stopOnError && errors.length > 0) {
// stopOnError为false,不立刻终止, 等全部执行后, 如果有错误,将错误内容以数组形式抛出
reject(new Error(errors));
} else {
for (const skippedIndex of skippedIndexes) {
// 删除跳过的值, 不然会存在空的占位
result.splice(skippedIndex, 1);
}
// 返回最终的处理结果
resolve(result);
}
}
return;
}
// 正在处理的任务加1
resolvingCount++;
(async () => {
try {
// 获取迭代元素
const element = await nextItem.value;
// 如果有异常,直接返回
if (isRejected) {
return;
}
// 调用mapper函数
const value = await mapper(element, index);
if (value === pMapSkip) {
// 处理跳过的情形,可以在mapper函数中返回pMapSkip,来跳过当前项
// 比如在异常捕获的catch语句中,返回pMapSkip值
skippedIndexes.push(index);
} else {
// 把返回值按照索引进行保存
result[index] = value;
}
resolvingCount--;
// 迭代下一项
next();
} catch (error) {
if (stopOnError) {
// 出现异常时, 是否终止, 默认值为true
isRejected = true;
reject(error);
} else {
errors.push(error);
resolvingCount--;
next();
}
}
})();
};
// 根据配置的concurrency值, 并发执行任务
for (let index = 0; index < concurrency; index++) {
next();
if (isIterableDone) {
break;
}
}
});
};
export const pMapSkip = Symbol('skip');
export default pMap;
/**
*
const inputs = [200, 100, 50];
const mapper = (value) => delay(value, { value });
async function main() {
console.time("start");
const result = await pMap(inputs, mapper, { concurrency: 1 });
console.dir(result); // 输出结果:[ 200, 100, 50 ]
console.timeEnd("start");
}
// [ 200, 100, 50 ]
// start: 368.708ms
//当吧concurrency 属性的值更改为 2 之后
[ 200, 100, 50 ]
start: 210.322ms
*/
import pMap from 'p-map';
/**
* @description: 用于并发执行promise 可以控制并发数量
* @param {Iterable<Function>} iterable
* @param {Object} options { concurrency: number —— 并发数,默认值 Infinity,最小值为 1;
stopOnError: boolean —— 出现异常时,是否终止,默认值为 true。}
* @return {*}
*/
export default async function pAll(iterable, options) {
return pMap(iterable, (element) => element(), options);
}
import pMap from './p-map';
/**
* @description:
* @param {Iterable<Promise | any>} iterable
* @param {(element, index)=> Function} filterer filterer(element, index): Function
* @param {Object} options {concurrency: number —— 并发数,默认值 Infinity,最小值为 1。}
* @return {*}
*/
export default async function pFilter(iterable, filterer, options) {
const values = await pMap(
iterable,
(element, index) => Promise.all([filterer(element, index), element]),
options
);
return values.filter((value) => Boolean(value[0])).map((value) => value[1]);
}
/**
*
import pFilter from "p-filter";
const inputs = [Promise.resolve(1), 2, 3];
const filterer = (x) => x % 2;
async function main() {
const result = await pFilter(inputs, filterer, { concurrency: 1 });
console.dir(result); // 输出结果:[ 1, 3 ]
}
// filterer也可以是一个promise
import pFilter from "p-filter";
const inputs = [Promise.resolve(1), 2, 3];
const filterer = (x) => Promise.resolve(x % 2);
async function main() {
const result = await pFilter(inputs, filterer);
console.dir(result); // [ 1, 3 ]
}
*/
const endSymbol = Symbol('pForever.end');
/**
* @description: 重复运行返回promise异步函数,直到你结束它
* @param {Function} function_
* @param {unknown} previousValue 传给function_的初始值
* @return {*}
*/
const pForever = async (function_, previousValue) => {
const newValue = await function_(await previousValue);
if (newValue === endSymbol) {
return;
}
return pForever(function_, newValue);
};
pForever.end = endSymbol;
export default pForever;
/**
*
import delay from "delay";
import pForever from "p-forever";
async function main() {
let index = 0;
await pForever(async () => (++index === 10 ? pForever.end : delay(50)));
console.log("当前index的值: ", index); // 输出结果:当前index的值: 10
}
main();
*/
/**
* @description: 将promise-returning或async函数组合到可重用的管道
* @param {array} functions
* @return {*}
*/
export default function pPipe(...functions) {
if (functions.length === 0) {
throw new Error('Expected at least one argument');
}
return async input => {
let currentValue = input;
for (const function_ of functions) {
currentValue = await function_(currentValue); // eslint-disable-line no-await-in-loop
}
return currentValue;
};
}
/**
import pPipe from "p-pipe";
const addUnicorn = async (string) => `${string} Unicorn`;
const addRainbow = async (string) => `${string} Rainbow`;
const pipeline = pPipe(addUnicorn, addRainbow);
(async () => {
console.log(await pipeline("❤️")); // 输出结果:❤️ Unicorn Rainbow
})();
*/
import isEmptyIterable from 'isEmptyIterable';
/**
* @description: p-race 这个模块修复了 Promise.race API 一个 “愚蠢” 的行为。当使用空的可迭代对象,调用 Promise.race API 时,将会返回一个永远处于 pending 状态的 Promise 对象
* @param {*} iterable
* @return {*}
*/
export default async function pRace(iterable) {
if (isEmptyIterable(iterable)) {
throw new RangeError('Expected the iterable to contain at least one item');
}
return Promise.race(iterable);
}
const pReduce = async (iterable, reducer, initialValue) => {
return new Promise((resolve, reject) => {
// 获取迭代器
const iterator = iterable[Symbol.iterator]();
let index = 0;
const next = async (total) => {
// 获取下一项
const element = iterator.next();
// 判断迭代器是否迭代完成
if (element.done) {
resolve(total);
return;
}
try {
const [resolvedTotal, resolvedValue] = await Promise.all([
total,
element.value,
]);
// 迭代下一项
// reducer(previousValue, currentValue, index): Function
next(reducer(resolvedTotal, resolvedValue, index++))
} catch (error) {
reject(error);
}
};
next(initialValue)
});
};
export default pReduce;
/**
const inputs = [Promise.resolve(1), delay(50, { value: 6 }), 8];
async function main() {
const result = await pReduce(inputs, async (a, b) => a + b, 0);
console.dir(result); // 输出结果:15
}
main();
*/
/**
* @description: 串行执行任务, 需要等待上一个执行完毕再执行下一个
* @param {Iterable<Promise | any>} tasks
* @return {*}
*/
export default async function pSeries(tasks) {
for (const task of tasks) {
if (typeof task !== 'function') {
throw new TypeError(
`Expected task to be a \`Function\`, received \`${typeof task}\``
);
}
}
const results = [];
for (const task of tasks) {
results.push(await task()); // eslint-disable-line no-await-in-loop
}
return results;
}
/**
import pSeries from "p-series";
const tasks = [async () => 1 + 1, () => 2 + 2, async () => 3 + 3];
async function main() {
const result = await pSeries(tasks);
console.dir(result); // 输出结果:[2, 4, 6]
}
main();
*/
// 自定义超时错误
export class TimeoutError extends Error {
constructor(message) {
super(message);
this.name = 'TimeoutError';
}
}
/**
* @description: promise在超时指定的时间后,返回报错信息,执行超时回调
* @param {Promise} promise
* @param {number} milliseconds 通过无穷大将导致它永远不会超时
* @param {Function | string | Error} fallback 在超时的时候做一些错误的拒绝之外的事情, 为string|Error的时候是超时返回内容
* @param {*} options
* @return {*}
*/
export default function pTimeout(promise, milliseconds, fallback, options) {
let timer;
const cancelablePromise = new Promise((resolve, reject) => {
// milliseconds 类型校验
if (typeof milliseconds !== 'number' || milliseconds < 0) {
throw new TypeError('Expected `milliseconds` to be a positive number');
}
// milliseconds为Number.POSITIVE_INFINITY 将不会提示超时的错误
if (milliseconds === Number.POSITIVE_INFINITY) {
resolve(promise);
return;
}
// 可以自定义 定时器函数, 例如测试工具使用
options = {
customTimers: { setTimeout, clearTimeout },
...options,
};
timer = options.customTimers.setTimeout.call(
undefined,
() => {
if (typeof fallback === 'function') {
try {
resolve(fallback());
} catch (error) {
reject(error);
}
return;
}
const message =
typeof fallback === 'string'
? fallback
: `Promise timed out after ${milliseconds} milliseconds`;
const timeoutError =
fallback instanceof Error ? fallback : new TimeoutError(message);
if (typeof promise.cancel === 'function') {
promise.cancel();
}
reject(timeoutError);
},
milliseconds
);
(async () => {
try {
// await 一个 promise 表示需要等待这个promise为rejected或resolved再执行后续的代码
resolve(await promise);
} catch (error) {
reject(error);
} finally {
options.customTimers.clearTimeout.call(undefined, timer);
}
})();
});
cancelablePromise.clear = () => {
clearTimeout(timer);
timer = undefined;
};
return cancelablePromise;
}
/*
import {setTimeout} from 'timers/promises';
import pTimeout from 'p-timeout';
const delayedPromise = setTimeout(200);
await pTimeout(delayedPromise, 50);
let p = () => new Promise((resolve) => {
setTimeout(() => {
console.log('normal func');
resolve();
}, 1000);
});
pTimeout(p(), 500, () => {
console.log('has delay');
});
*/
import pMap from 'p-map';
/**
* @description: 并发运行promise- returns & async函数特定次数
* @param {number} count 调用次数
* @param {Function} mapper mapper(index)
* @param {*} options
* @return {*}
*/
export default function pTimes(count, mapper, options) {
return pMap(
Array.from({ length: count }).fill(),
(_, index) => mapper(index),
options
);
}
/**
*
import delay from "delay";
import pTimes from "p-times";
async function main() {
console.time("start");
const result = await pTimes(5, async (i) => delay(50, { value: i * 10 }), {
concurrency: 3,
});
console.dir(result);
console.timeEnd("start");
}
main();
*/
import pReduce from 'p-reduce';
/**
* @description: 从左到右的执行任务流,上一个函数的返回值作为下个函数的参数
* @param {Iterable<Function>} iterable
* @param {unknown} initialValue unknown:将作为第一个任务的 previousValue
* @return {*}
*/
export default async function pWaterfall(iterable, initialValue) {
return pReduce(
iterable,
(previousValue, function_) => function_(previousValue),
initialValue
);
}
/**
import pWaterfall from "p-waterfall";
const tasks = [
async (val) => val + 1,
(val) => val + 2,
async (val) => val + 3,
];
async function main() {
const result = await pWaterfall(tasks, 0);
console.dir(result); // 输出结果:6
}
main();
*/
// Promisify回调式的函数
const processFn = (fn, options, proxy, unwrapped) =>
function (...args) {
const P = options.promiseModule;
return new P((resolve, reject) => {
if (options.multiArgs) {
args.push((...result) => {
if (options.errorFirst) {
if (result[0]) {
reject(result);
} else {
result.shift();
resolve(result);
}
} else {
resolve(result);
}
});
} else if (options.errorFirst) {
args.push((error, result) => {
if (error) {
reject(error);
} else {
resolve(result);
}
});
} else {
args.push(resolve);
}
const self = this === proxy ? unwrapped : this;
Reflect.apply(fn, self, args);
});
};
const filterCache = new WeakMap();
export default (input, options) => {
options = {
exclude: [/.+()?:Sync|Stream)$/],
errorFirst: true,
promiseModule: Promise,
...options,
};
const objectType = typeof input;
if (
!(input !== null && (objectType === 'object' || objectType === 'function'))
) {
throw new TypeError(
`Expected \`input\` to be a \`Function\` or \`Object\`, got \`${
input === null ? 'null' : objectType
}\``
);
}
const filter = (target, key) => {
let cached = filterCache.get(target);
if (!cached) {
cached = {};
filterCache.set(target, cached);
}
if (key in cached) {
return cached[key];
}
const match = (pattern) =>
typeof pattern === 'string' || typeof key === 'symbol'
? key === pattern
: pattern.test(key);
const desc = Reflect.getOwnPropertyDescriptor(target, key);
const writableOrConfigurableOwn =
desc === undefined || desc.writable || desc.configurable;
const included = options.include
? options.include.some(match)
: !options.exclude.some(match);
const shouldFilter = included && writableOrConfigurableOwn;
cached[key] = shouldFilter;
return shouldFilter;
};
const cache = new WeakMap();
const proxy = new Proxy(input, {
apply(target, thisArg, args) {
const cached = cache.get(target);
if (cached) {
return Reflect.apply(cached, thisArg, args);
}
const pified = options.excludeMain
? target
: processFn(target, options, proxy, target);
cache.set(target, pified);
return Reflect.apply(pified, thisArg, args);
},
get(target, key) {
const property = target[key];
if (!filter(target, key) || property === Function.prototype[key]) {
return property;
}
const cached = cache.get(property);
if (cached) {
return cached;
}
if (typeof property === 'function') {
const pified = processFn(property, options, proxy, target);
cache.set(property, pified);
return pified;
}
return property;
},
});
return proxy;
};
import pTimeout from 'p-timeout';
// 符号指定了一个对象的默认异步迭代器。如果一个对象设置了这个属性,它就是异步可迭代对象,可用于for await...of循环。
const symbolAsyncIterator = Symbol.asyncIterator || '@@asyncIterator';
// 监听函数标准化
const normalizeEmitter = (emitter) => {
const addListener =
emitter.on || emitter.addListener || emitter.addEventListener;
const removeListener =
emitter.off || emitter.removeListener || emitter.removeEventListener;
if (!addListener || !removeListener) {
throw new TypeError('Emitter is not compatible');
}
return {
addListener: addListener.bind(emitter),
removeListener: removeListener.bind(emitter),
};
};
// 值 数组化
const toArray = (value) => (Array.isArray(value) ? value : [value]);
/**
* @description: 等待多个事件释放。返回一个数组, 参数和pEvent基本相同
* @param {*} options : {count: 在promise解析之前需要触发事件的次数。resolveImmediately: 是否立即解决承诺。触发其中一个rejectionevent不会抛出错误。注意:当事件被触发时,返回的数组将被改变。}
* @return {*}
*/
const multiple = (emitter, event, options) => {
let cancel;
const ret = new Promise((resolve, reject) => {
options = {
rejectionEvents: ['error'],
multiArgs: false,
resolveImmediately: false,
...options,
};
// count 类型校验
if (
!(
options.count >= 0 &&
(options.count === Infinity || Number.isInteger(options.count))
)
) {
throw new TypeError('The `count` option should be at least 0 or more');
}
// Allow multiple events 允许复合事件
const events = toArray(event);
const items = [];
const { addListener, removeListener } = normalizeEmitter(emitter);
const onItem = (...args) => {
const value = options.multiArgs ? args : args[0];
if (options.filter && !options.filter(value)) {
return;
}
items.push(value);
if (options.count === items.length) {
cancel();
resolve(items);
}
};
const rejectHandler = (error) => {
cancel();
reject(error);
};
cancel = () => {
for (const event of events) {
removeListener(event, onItem);
}
for (const rejectionEvent of options.rejectionEvents) {
removeListener(rejectionEvent, rejectHandler);
}
};
for (const event of events) {
addListener(event, onItem);
}
for (const rejectionEvent of options.rejectionEvents) {
addListener(rejectionEvent, rejectHandler);
}
if (options.resolveImmediately) {
resolve(items);
}
});
ret.cancel = cancel;
if (typeof options.timeout === 'number') {
const timeout = pTimeout(ret, options.timeout);
timeout.cancel = cancel;
return timeout;
}
return ret;
};
/**
* @description:
* @param {*} emitter 拥有 .on()/.addListener()/.addEventListener() and .off()/.removeListener()/.removeEventListener() 像是EventEmitter / DOM events
* @param {string, string[]} event 要听的事件或事件的名称。如果在这里和rejectionEvents中定义了相同的事件,则此事件优先。
* @param {object} options {rejectionEvents: 会拒绝承诺的事件 , multiArgs: 默认情况下,承诺的函数将只返回事件回调的第一个参数,这在大多数api中都可以正常工作。这个选项对于在回调中返回多个参数的api非常有用。打开这个选项将使它返回一个包含回调函数所有参数的数组,而不仅仅是第一个参数。这也适用于拒绝。 timeout: 超时前的时间(毫秒)。 filter:用于接受事件的筛选函数 }
* @return {*}
*/
const pEvent = (emitter, event, options) => {
if (typeof options === 'function') {
options = { filter: options };
}
options = {
...options,
count: 1,
resolveImmediately: false,
};
const arrayPromise = multiple(emitter, event, options);
const promise = arrayPromise.then((array) => array[0]); // eslint-disable-line promise/prefer-await-to-then
promise.cancel = arrayPromise.cancel;
return promise;
};
/**
* @description: 返回一个异步迭代器,它允许您异步迭代从发射器发出的事件的事件。 当发射器发出与 resolutionEvents 中定义的任何事件相匹配的事件时,迭代器结束,或者如果发射器发射了 rejectEvents 选项中定义的任何事件,则迭代器拒绝。
此方法具有与 pEvent() 相同的参数和选项
* @param {*}
* @return {*}
*/
const iterator = (emitter, event, options) => {
if (typeof options === 'function') {
options = { filter: options };
}
// Allow multiple events
const events = toArray(event);
options = {
rejectionEvents: ['error'],
resolutionEvents: [],
limit: Infinity,
multiArgs: false,
...options,
};
const { limit } = options;
const isValidLimit =
limit >= 0 && (limit === Infinity || Number.isInteger(limit));
if (!isValidLimit) {
throw new TypeError(
'The `limit` option should be a non-negative integer or Infinity'
);
}
if (limit === 0) {
// Return an empty async iterator to avoid any further cost
return {
[Symbol.asyncIterator]() {
return this;
},
async next() {
return {
done: true,
value: undefined,
};
},
};
}
const { addListener, removeListener } = normalizeEmitter(emitter);
let isDone = false;
let error;
let hasPendingError = false;
const nextQueue = [];
const valueQueue = [];
let eventCount = 0;
let isLimitReached = false;
const valueHandler = (...args) => {
eventCount++;
isLimitReached = eventCount === limit;
const value = options.multiArgs ? args : args[0];
if (nextQueue.length > 0) {
const { resolve } = nextQueue.shift();
resolve({ done: false, value });
if (isLimitReached) {
cancel();
}
return;
}
valueQueue.push(value);
if (isLimitReached) {
cancel();
}
};
const cancel = () => {
isDone = true;
for (const event of events) {
removeListener(event, valueHandler);
}
for (const rejectionEvent of options.rejectionEvents) {
removeListener(rejectionEvent, rejectHandler);
}
for (const resolutionEvent of options.resolutionEvents) {
removeListener(resolutionEvent, resolveHandler);
}
while (nextQueue.length > 0) {
const { resolve } = nextQueue.shift();
resolve({ done: true, value: undefined });
}
};
const rejectHandler = (...args) => {
error = options.multiArgs ? args : args[0];
if (nextQueue.length > 0) {
const { reject } = nextQueue.shift();
reject(error);
} else {
hasPendingError = true;
}
cancel();
};
const resolveHandler = (...args) => {
const value = options.multiArgs ? args : args[0];
if (options.filter && !options.filter(value)) {
return;
}
if (nextQueue.length > 0) {
const { resolve } = nextQueue.shift();
resolve({ done: true, value });
} else {
valueQueue.push(value);
}
cancel();
};
for (const event of events) {
addListener(event, valueHandler);
}
for (const rejectionEvent of options.rejectionEvents) {
addListener(rejectionEvent, rejectHandler);
}
for (const resolutionEvent of options.resolutionEvents) {
addListener(resolutionEvent, resolveHandler);
}
return {
[symbolAsyncIterator]() {
return this;
},
async next() {
if (valueQueue.length > 0) {
const value = valueQueue.shift();
return {
done: isDone && valueQueue.length === 0 && !isLimitReached,
value,
};
}
if (hasPendingError) {
hasPendingError = false;
throw error;
}
if (isDone) {
return {
done: true,
value: undefined,
};
}
return new Promise((resolve, reject) =>
nextQueue.push({ resolve, reject })
);
},
async return(value) {
cancel();
return {
done: isDone,
value,
};
},
};
};
const TimeoutError = pTimeout.TimeoutError;
export default pEvent;
export {
multiple,
iterator,
TimeoutError
};
/**
*
const pEvent = require('p-event');
(async () => {
await pEvent(document, 'DOMContentLoaded');
console.log('😎');
})();
// async iteration
const pEvent = require('p-event');
const emitter = require('./some-event-emitter');
(async () => {
const asyncIterator = pEvent.iterator(emitter, 'data', {
resolutionEvents: ['finish']
});
for await (const event of asyncIterator) {
console.log(event);
}
})();
*/
学习源码: https://github.com/sindresorhus/promise-fun。 关于promise风格代码的管理