taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
6.17k stars 405 forks source link

Flows - Ratelimiting issues #621

Closed germanoeich closed 3 years ago

germanoeich commented 3 years ago

Hello!

I have observed some wierd behaviour when using the Worker limiter and a flow. No matter what I try, I just can't achieve a groupKey like ratelimiter with flows, in fact, when groupKey is set, the whole ratelimit for the flow jobs is broken. I noticed FlowJob has a rateLimiterKey property, I am unsure what the functionality of this field is, but I've also tried a bunch of combinations (job data field name, random values for each parent, setting it on the children, etc) and it doesn't seem to affect the ratelimiter at all.

Here's some repro code:

import { FlowProducer, Job, Queue, QueueScheduler, Worker } from 'bullmq';

export default async function process(job: Job, token?: string) {
    console.log(`Received job for ${job.name} ${job.id}`);
}

const queueName = 'test';

const worker = new Worker(queueName, process, {
    concurrency: 100,
    limiter: {
        max: 1,
        duration: 5000,
        groupKey: 'guild',
    },
});

const qp = new QueueScheduler(queueName);
const f = new FlowProducer();
f.add({
    name: 'test',
    queueName: queueName,
    data: { guild: 1 },
    children: [
        {
            name: 'testd1-1',
            data: { guild: 1 },
            queueName,
            children: [
                {
                    name: 'testd2-1',
                    queueName,
                    data: { guild: 1 },
                    children: [{
                        name: 'testd3-1',
                        queueName
                    }],
                },
            ],
        },
    ],
});

f.add({
    name: 'test2',
    queueName: queueName,
    data: { guild: 2 },
    children: [
        {
            name: 'testd1-2',
            data: { guild: 2 },
            queueName,
            children: [
                {
                    name: 'testd2-2',
                    queueName,
                    data: { guild: 2 },
                    children: [{
                        name: 'testd3-2',
                        queueName
                    }],
                },
            ],
        },
    ],
});

Version: 1.36.1

What I expect to happen: Job testd3-2 and job testd3-1 are fired simultaneously, after 5s, testd2-1 and testd2-2 are fired simultaneously, so on and so forth What is happening: Jobs are not being ratelimited at all, only respecting the children -> parent order.

If I remove the groupKey from the worker, the jobs are ratelimited correctly (In this case, only 1 job is ran every 5s), however that's not the behaviour I need.

Am I approaching this issue in the wrong way / is there any other way of achieving this?

manast commented 3 years ago

This is probably a bug since it is a case we are lacking unit tests for. In theory it should work without any changes but I guess we overlooked something.

germanoeich commented 3 years ago

@manast After some digging into the bull source code I found the issue, I am unsure what the best way to fix it is (in regards to the API that would be exposed)

On queue, when you specify a groupKey, it grabs the value from jobData and embeds it into the job id. The moveToActive-8.lua script will read this like so:

local groupKey = string.match(jobId, "[^:]+$")

However for FlowProducer, there is no way to set a groupKey when creating it, and FlowProducer.add lacks any logic to add this key to the jobId.

The way I see it, to fix this issue, bull must expose the limiter.groupKey option on one of these places: 1 - FlowProducer constructor (I think this would be the best option, since Queue exposes it in the constructor aswell) 2 - In the FlowJob Interface, which might be needed since flow jobs can use multiple queues, and thus could have multiple groupKeys

Of course, logic would need to be added to FlowProducer.add() aswell.

I'm willing to PR this but I'd love some input on what the best way to fix it is, and also if there's any other issues with this approach

As a workaround for this, one can set the id manually in this format: uuid:groupKeyValue where groupKeyValue is the actual value passed on jobData.

roggervalf commented 3 years ago

@germanoeich please try the last version, this feature is merged now