denoland / std

The Deno Standard Library
https://jsr.io/@std
MIT License
2.99k stars 596 forks source link

Feature request (async): limit() to limit a function concurrency #4766

Open guy-borderless opened 3 months ago

guy-borderless commented 3 months ago

Is your feature request related to a problem? Please describe. I find p-limit helpful (APIs often limit user concurrency). The async module already has retry, debounce, and pooledMap which are similar. An API like p-limit in deno-std will be great.

import pLimit from 'p-limit';

const limit = pLimit(10);

// only 10 concurrent requests
export const geocodeAddress = limit(async (address: string) => {...})
iuioiua commented 3 months ago

Doesn't pooledMap() achieve essentially the same thing with its poolLimit parameter?

guy-borderless commented 3 months ago

Doesn't pooledMap() achieve essentially the same thing with its poolLimit parameter?

Not easily I think, this scenario is for when we don't know the iterable up front. So we still have to create and manage an iterable here (and probably prefer to use some lib for it). How would you write the above code with pooledMap?

BlackAsLight commented 3 months ago

With the above code, how do you imagine the iterable would be provided to it?

kt3k commented 3 months ago

I'm not sure pLimit is common abstraction of that situation/requirement.

Could Semaphore class in this package ( https://jsr.io/@lambdalisue/async ) achieve that same thing?

This issue ( https://github.com/denoland/deno_std/issues/4536 ) might be related to this topic.

guy-borderless commented 3 months ago

Would that be:

import { Semaphore } from "@std/async";

const limiter = new Semaphore(10);

// only 10 concurrent requests
export const geocodeAddress = limiter.lock(async (address: string) => {...})

If it's about that, np. Btw, how would you rate limit invocations? (max calls per duration)

BlackAsLight commented 3 months ago

I imagine you could do something like this to accomplish the same thing you're after.

import { pooledMap } from '@std/async'

function pLimit(poolLimit: number) {
  return function <T, R>(iteratorFn: (data: T) => Promise<R>) {
    return function <T>(array: Iterable<T> | AsyncIterable<T>) {
      return pooledMap(poolLimit, array, iteratorFn)
    }
  }
}

const limit = pLimit(10)

const geocodeAddress = limit(async (address: string) => {...})

geocodeAddress(['an array of addresses'])
guy-borderless commented 3 months ago

I imagine you could do something like this to accomplish the same thing you're after.


import { pooledMap } from '@std/async'

function pLimit(poolLimit: number) {
  return function <T, R>(iteratorFn: (data: T) => Promise<R>) {
    return function <T>(array: Iterable<T> | AsyncIterable<T>) {
      return pooledMap(poolLimit, array, iteratorFn)
    }
  }
}

const limit = pLimit(10)

const geocodeAddress = limit(async (address: string) => {...})

geocodeAddress(['an array of addresses'])
``

your code is not equivalent. it requires all addresses to be batched in the same lexical place. In practice, to be equivalent you need to introduce state.

BlackAsLight commented 3 months ago

your code is not equivalent. it requires all addresses to be batched in the same lexical place. In practice, to be equivalent you need to introduce state.

const { readable, writable } = new TranformStream<string, string>()
geocodeAddress(readable)

const writer = writable.getWriter()

// write an address in different places
writer.write('address')

// call at the very end when you know there is no more addresses
writer.close()
BlackAsLight commented 3 months ago
import { pooledMap } from '@std/async'

const geocodeAddress = function() {
  const { readable, writable } = new TransformStream<string, string>()
  pooledMap(10, readable, async (address: string) => {
    ...
  })
  return writable.getWriter()
}()

geocodeAddress.write('address')

geocodeAddress.close()
guy-borderless commented 3 months ago

by the "leakiness" and general friction seen here, it does seem a primitive is missing. I think it would be useful to have something like a semaphore in the standards library

enote-kane commented 2 months ago

+1 for a semaphore

I just got a case where I have nested arrays of promises, all ending up in a remote API call, overloading the remote (or risk getting blocked/banned for too many open connections), so I need a way to limit the number of "active" API calls just where they are happening.