actionhero / node-resque

Node.js Background jobs backed by redis.
https://node-resque.actionherojs.com
Apache License 2.0
1.37k stars 151 forks source link

How to control the speed of performing a job #326

Closed evershy closed 4 years ago

evershy commented 4 years ago

I need to make a request to an API, but the API limits the speed of requests to 10 per sec. when the redis queue A has 1000 jobs called jobB to perform, how to control the speed of performing the job, or performing it at a constant speed, e.g. 10 jobs per second.

  const jobs = {
    jobB: {
      plugins: ["JobLock"],
      pluginOptions: {
        JobLock: {}
      },
      perform: async (a, b) => {
        // request a api
      }
    }
  };
evantahler commented 4 years ago

Hi @evershy!

It sounds like you want to implement a rate-limiting plugin. Since you are describing a limit based on connections per second (vs connections in parallel), you could use an expiring key for this:

It's probably best not to make keys for the timestamps of each second (clock drift on various computers) but just to have 1 key.

Once you have your plugin built, you could your job like this to pass the limit:

  const jobs = {
    jobB: {
      plugins: ["JobLock", "RateLimit"],
      pluginOptions: {
        JobLock: {},
        RateLimit: { limitPerSecond: 10 }
      },
      perform: async (a, b) => {
        // request a api
      }
    }
  };
evantahler commented 4 years ago

Also, if you want to add a plugin like this to node-resque directly, that would be wonderful!

evershy commented 4 years ago

Hi @evershy!

It sounds like you want to implement a rate-limiting plugin. Since you are describing a limit based on connections per second (vs connections in parallel), you could use an expiring key for this:

* in the `beforePerform` method of the plugin, check to see if a key exists in redis, and that it is < 10

* if the key is already over 10, re-enqueue the job for 1 second from now (`queue.enqueueIn()`)

* if the key doesn't exist, or it is less than 10, you can move forward with the job

  * increment the key by 1, so this worker can claim one of the 10 'slots' for this second
  * add a ttl/expire to the key for 1 second so the key will delete itself automatically after one second

It's probably best not to make keys for the timestamps of each second (clock drift on various computers) but just to have 1 key.

Once you have your plugin built, you could your job like this to pass the limit:

  const jobs = {
    jobB: {
      plugins: ["JobLock", "RateLimit"],
      pluginOptions: {
        JobLock: {},
        RateLimit: { limitPerSecond: 10 }
      },
      perform: async (a, b) => {
        // request a api
      }
    }
  };

I implemented the RateLimit Plugin following your advice. I would be grateful if you could help checking the logic below.

const NodeResque = require("node-resque");

class RateLimit extends NodeResque.Plugin {
  constructor(worker, func, queue, job, args, options) {
    super(worker, func, queue, job, args, options);

    if (!this.options.limitPerSecond) {
      this.options.limitPerSecond = 10;
    }
  }

  async beforePerform() {
    const key = this.key();
    const limitPerSecond = this.options.limitPerSecond;
    let currentLimit = await this.queueObject.connection.redis.get(key);

    if (!currentLimit) {
      currentLimit = 0;
    }

    if (currentLimit <= limitPerSecond) {
      const limitByMe = await this.queueObject.connection.redis.set(
        key,
        parseInt(currentLimit, 10) + 1,
        "EX",
        1
      );
      return true;
    } else {
      await this.reEnqueue();
      return false;
    }
  }

  async reEnqueue() {
    await this.queueObject.enqueueIn(
      this.enqueueTimeout(),
      this.queue,
      this.func,
      this.args
    );
  }

  enqueueTimeout() {
    if (this.options.enqueueTimeout) {
      return this.options.enqueueTimeout;
    } else {
      return 1001; // in ms
    }
  }

  key() {
    if (this.options.key) {
      return typeof this.options.key === "function"
        ? this.options.key.apply(this)
        : this.options.key;
    } else {
      return this.worker.connection.key("ratelimit", this.func, this.queue);
    }
  }
}

module.exports = RateLimit;
evantahler commented 4 years ago

At a quick glance that looks a good stat! Be sure to write some tests for it.

There's a race condition you can get into where you read the key (it has a value of 9) and by the time you write back to it, another worker has started the 10th job before this worker did. You can use watch to prevent this - https://redis.io/topics/transactions#optimistic-locking-using-check-and-set

Here's an example of the node-resque scheduler using watch to precent a command if someone else changes the key https://github.com/actionhero/node-resque/blob/master/src/core/scheduler.ts#L268-L280

evershy commented 4 years ago

At a quick glance that looks a good stat! Be sure to write some tests for it.

There's a race condition you can get into where you read the key (it has a value of 9) and by the time you write back to it, another worker has started the 10th job before this worker did. You can use watch to prevent this - https://redis.io/topics/transactions#optimistic-locking-using-check-and-set

Here's an example of the node-resque scheduler using watch to precent a command if someone else changes the key https://github.com/actionhero/node-resque/blob/master/src/core/scheduler.ts#L268-L280

Thank you very much.

juneapp commented 4 years ago

Hi @evershy can you provide me with your final RateLimit version? Thanks a lot and best regards Henning