taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
5.99k stars 387 forks source link

Rate limiter per job name #358

Open francescov1 opened 3 years ago

francescov1 commented 3 years ago

Want to explore the possibility to rate limit a queue based on the job name. This would be a worker-level config which would enable applying a rate limiter to specific job names. This would be similar to the groupKey but would remove the need to pass an extra groupKey param in job data, the job name itself would act as the groupKey. I like this because it centralizes the rate limit implementation to solely the worker, doesn't matter what is passed as job data as long as the job name is correct.

My thoughts are something along these lines:

const worker = new Worker("myQueue",  async job => {

    switch (job.name) {
      case "limitedJob": {
        // rate limited 
        break;
      }
      case "normalJob": {
       // no rate limiting applied
        break;
      }
      default:

    }
  },
  {
    limiter: {
      jobs: {
          // this key indicates the job name to be limited
         limitedJob: {
            // same rate limit config as usual
            max: 30,
            duration: 1000
         }
      }
    }
  }
);

Happy to look into implementing this myself if it's a possibility!

francescov1 commented 3 years ago

Any thoughts on this? Would like to start exploring it but want to ensure it makes sense first.

manast commented 3 years ago

Functionally I think it seems like a nice fit, however I am not sure if it can be implemented cleanly in current design, I would need to look more deeply into it. You are welcome to start looking at it but limiting by groups is already a bit too complex for my liking so I would rather skip a functionality like this if it implies adding yet more complexity.

Kukunin commented 3 years ago

As an alternative to implementing this into BullMQ, a separate limiter might be useful, such as https://github.com/microlinkhq/async-ratelimiter.

The core feature of any limiter is just await delay(X) where X depends on previous calls, so it's possible to write a simple wrapper around the job handler that will wait a correspondent amount of time (0 if it's under the limit). Or, a wrapper that will reschedule the job with the right delay (if BullMQ allows it)

Kukunin commented 3 years ago

Here is a code snippet that I have in my project:

const { RateLimiterRedis } = require('rate-limiter-flexible')

const withRateLimit = (fn, { key, max, duration }) => {
  const limiter = new RateLimiterRedis({
    storeClient: new Redis(),
    points: max,
    duration: duration,
    keyPrefix: key,
  })

  const wrapper = async (...args) => {
    try {
      await limiter.consume('')
      return fn(...args)
    } catch (e) {
      if (e instanceof Error) {
        throw e
      }
      const delay = e.msBeforeNext
      if (delay > 0) {
        await new Promise((resolve) => setTimeout(resolve, delay))
      }
      return wrapper(...args)
    }
  }
  return wrapper
}

I can use it as:

new Worker('queue', withRateLimit(async () => {
  // job handler here
}, { key: 'my queue', max: 10, duration: 60 })

The only limitation, that duration is in seconds.

francescov1 commented 3 years ago

Looks great, I'll try it out! Thanks @Kukunin!