taskforcesh / bullmq-pro-support

Support repository for BullMQ Pro edition.
1 stars 0 forks source link

Dynamic Group Rate Limits Without Retrying Job #64

Closed brandonaaron closed 9 months ago

brandonaaron commented 9 months ago

I'd like to have a single group rate limited differently than the other groups. However, I do not want to also retry the current job. So for example one group would be rate limited to 1 per a minute but the other groups would not have a limit (or potentially a different dynamic limit). In the rate limited group, it'd ideally rate limit after completing the job but not force that job to be reprocessed.

Tweaking the example from the docs:

import { WorkerPro } from '@taskforcesh/bullmq-pro';

const duration = 60_000;
const worker = new WorkerPro(
  'myQueue',
  async job => {
    const groupId = job.opts.group.id;
    const result = await doWork(job.data);
    await worker.rateLimitGroup(job, duration);

    // can I avoid throwing the RateLimitError and just return the result instead?
    // throw WorkerPro.RateLimitError();
    return result
  },
  {
    connection,
  },
);

I thought I might be able to use pauseGroup and resumeGroup to kinda work around it but there are multiple instances of the worker in order to deal with the workload of the other groups. Maybe I could add a job to another group/no group with a delay that calls resumeGroup as a work around.

Would it be possible to rate limit the group without also throwing the RateLimitError()?

dginzbourg commented 9 months ago

I just curios what behavior (ie use-case) you target? rate limiting is a back pressure mechanism and I'd expect that it isn't done after the job has been completed, but either at the beginning or in the middle of job processing logic. If you don't throw RateLimitError, then the job is completed as in your example. You can condition the logic when delay the current job (implement back pressure) or mark it as completed (return result). RT works without you throwing exception, ie BullMQ will schedule no more than 1 job per group every 60s.

brandonaaron commented 9 months ago

I have an third-party API with low rate limits. I'd like to just have this uniquely rate limited within a group (versus limiting all groups the same or starting a new queue/worker/process just for this). Ideally I could just specify a limit in the config for only this group. From reading other issues on the topic it seems the way to acheive this kind of behavior is through calling rateLimitGroup.

I may have misunderstood but it seemed to me you were saying I should be able to call rateLimitGroup without also throwing the error. However, when I do that it throws an error.

Error: Missing lock for job 600acdd8710b5f001759ca28. failed
    at ScriptsPro.finishedErrors (./node_modules/bullmq/dist/cjs/classes/scripts.js:213:24)
    at ScriptsPro.finishedErrors (./node_modules/@taskforcesh/bullmq-pro/dist/cjs/classes/scripts-pro.js:449:30)
    at JobPro.moveToFailed (./node_modules/@taskforcesh/bullmq-pro/dist/cjs/classes/job-pro.js:208:32)
    at processTicksAndRejections (node:internal/process/task_queues:96:5)
    at async handleFailed (./node_modules/bullmq/dist/cjs/classes/worker.js:335:21)
    at async WorkerPro.retryIfFailed (./node_modules/bullmq/dist/cjs/classes/worker.js:537:24)

Here is a reduced test case:

const Redis = require('ioredis')
const { Queue } = require('@taskforcesh/bullmq-pro')
const { WorkerPro } = require('@taskforcesh/bullmq-pro')

const connection = new Redis('redis://127.0.0.1:6378/1', { maxRetriesPerRequest: null })

const queueName = 'group-dynamic-rate-limit'
const queue = new Queue(queueName, { connection })

const worker = new WorkerPro(queueName, async (job, token) => {
    console.log('working job...', job.name, job.data)
    if (job.opts?.group?.id) {
        // await job.moveToCompleted(job.data, token, false)
        await worker.rateLimitGroup(job, 5_000)
        // throw WorkerPro.RateLimitError()
    } else {
        // await job.moveToCompleted(job.data, token, false)
        await worker.rateLimit(5_000)
        // throw WorkerPro.RateLimitError()
    }
}, { connection })

worker.on('ready', () => { console.log('ready') })
worker.on('active', () => { console.log('active') })
worker.on('completed', (job, result) => { console.log('done', result) })

queue.add('test-group-job', { testGroup: 1 }, {
    group: { id: 'test-group' }
})
queue.add('test-group-job', { testGroup: 2 }, {
    group: { id: 'test-group' }
})
queue.add('test-group-job', { testGroup: 3 }, {
    group: { id: 'test-group' }
})

queue.add('test-job', { test: 1 })
queue.add('test-job', { test: 2 })
queue.add('test-job', { test: 3 })
queue.add('test-job', { test: 4 })

This throws the previously mentioned error stack ("Missing lock...") for the group based job.

It doesn't throw an error for the non-group based job but also does not actually rate limit unless the RateLimitError is thrown. Throwing the RateLimitError, maybe obviously, results in the "completed" event not firing.

If I uncomment the moveToCompleted and throw calls it fails for both group and non-group jobs with a "Missing lock for job".

Maybe I'm missing something or do something wrong? Using "@taskforcesh/bullmq-pro": "^6.8.0".

brandonaaron commented 9 months ago

Thinking about this some more, I don't think I can actually satisfy my use-case with the dynamic rate limit API, even if they worked as I was trying to use them. The workers/concurrency would pick up multiple jobs prior to a worker getting the chance to call rateLimitGroup.

dginzbourg commented 9 months ago

you misunderstood me. when you call RTdelay you have to throw. You didnt specify example of your business use case to better understand the problem and suggest design. It sounds like you plan to use groups for different logical tasks (but I might be mistaken). Groups are logical queues inside Queue and I'd expect that the logic is the same for all groups. If this is a case then when you call your 3rd party slow api rt-delay and throw rt-exception. Let's say group id represents your user, each user can submit some activity that requires 3rd api call. if 3rd api call is rate limited per your user then by delay and throwing exception each group will continue with slow pace. BullMQ assures to process task according to RT tasks inside group across all workers , for 5 tasks/min you will be called no kore than 5 times each min. when you throw RT exception, you delay s0ecific task in that group to a different time.

brandonaaron commented 9 months ago

Ok. Understood. Thanks for the quick feedback!