taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
6.16k stars 405 forks source link

BullMQ Pro groups: Sequential execution #1040

Open hardcodet opened 2 years ago

hardcodet commented 2 years ago

I'm evaluating Pro right now because we need to have sequential executions of jobs due in order to avoid concurrency issues. I though that Bull's group IDs might help for this szeanrio:

I did a quick test on groups, and it appears they are being scheduled in parallel (given concurrency allows this). I'm not sure whether that's a misconception or misconfiguration on my side, but just starting jobs in sequence doesn't help regarding race conditions or jobs hogging all resources. Also, I'm not sure if groups are all but ignored. Here's my test script:

        // create queue with callback function
        await this.queueService.createQueue("foo", async (c: {id: string}) => {
            console.log(`enter ${c.id}`);
            await PromiseUtil.delay(3000);
            console.log(`exit ${c.id}`);
        });

        // register jobs in groups "a" and "b"
        await this.queueService.enqueueJob("foo", { id: "a1" }, "a");
        await this.queueService.enqueueJob("foo", { id: "a2" }, "a");
        await this.queueService.enqueueJob("foo", { id: "a3" }, "a");
        await this.queueService.enqueueJob("foo", { id: "a4" }, "a");

        await this.queueService.enqueueJob("foo", { id: "b1" }, "b");
        await this.queueService.enqueueJob("foo", { id: "b2" }, "b");

With a concurrency of 2, it just seems to process them in order without looking at the groups in the first place. Here's my output - as you can see, the jobs are just executed in submission order, even though I'm setting the jobOptions.group flag:

enter a1
enter a2
exit a1
exit a2
enter a3
enter a4
exit a3
exit a4
enter b1
enter b2
exit b1
exit b2

Side note: if I also create a limiter on the worker, this seems to kill execution order completely. Here's the console output of a run with a limiter active (note that "a2" is executing after a3 and a4 despite being scheduled second):

enter a1
enter b1
enter a3
exit a1
enter a4
exit b1
enter a2
exit a3
enter b2
exit a4
exit a2
exit b2

Here's my queue setup, btw:

       const connection = new IORedis(this.config.redisUri);
        const queue = new QueuePro<T>(queueName, { connection });
        const scheduler = new QueueScheduler(queueName, { connection });
        const worker = new WorkerPro(queueName, async (job: Job) => {
            // no try/catch needed - we're listening on the failed event of the worker
            await callback(job.data, job);
        }, { connection, concurrency: 3 });

And here's how I schedule the jobs:

    public async enqueueJob<T>(queueName: string, payload: T, groupId?: string, options?: JobExecutionOptions): Promise<string> {
        const queue: QueuePro = this.getQueue<T>(queueName);

        const jobId = `${queueName}_${Uuid.create()}`;

        const jobOptions: JobsProOptions = {
            jobId,
            delay: options && options.delay,
            removeOnComplete: true,
            attempts: options && options.maxRetries || 10,
            backoff: options && options.retryBackoff || 5000,
            repeat: options && options.repeat,
        };

        if (groupId) {
            jobOptions.group = { id: groupId };
        }

        await queue.add(queueName, payload, jobOptions);
        return jobId;
    }

Thanks for your advice :)

manast commented 2 years ago

You are correct in your assumption that currently, groups do not support concurrency per group. Interestingly we have just received this same request from another user so we have started to figure out a design for this feature since we realize it is quite important in some use cases: https://github.com/taskforcesh/bullmq-pro/issues/30

Regarding your test code above, I have not tested it yet, but one "problem" I see is that since the jobs execute so fast, they are probably completed before you add the next job, therefore they are executed in the same order as you add them. You will need to simulate a queue that has already all the jobs in it before running a worker in order to verify that the jobs are processed in group order.

hardcodet commented 2 years ago

Yeah, concurrency would make or break Pro for us due to race conditions. Unfortunately, I can't open the link, I assume the repo is private?

I see is that since the jobs execute so fast, they are probably completed before you add the next job

I'm afraid that's not it. Every job has an await of 3 seconds, so all jobs actually were added before the first one completes. Here's the console output with timestamps and an additional console output for when all the jobs where created:

enter a1 2022-01-31T08:36:15.611Z
enter a2 2022-01-31T08:36:15.756Z
all jobs enqueued
exit a1
exit a2
enter a3 2022-01-31T08:36:18.766Z
enter a4 2022-01-31T08:36:18.902Z
exit a3
exit a4
enter b1 2022-01-31T08:36:21.924Z
enter b2 2022-01-31T08:36:22.056Z
exit b1
exit b2

Let me know if I can support you further in triangulating this.

manast commented 2 years ago

Actually, it seems like the result you are getting is correct, because you actually have 2 different queues, "a" and "b", I was misled by this since I thought you were using different groups in the same queue.

hardcodet commented 2 years ago

Unfortunately not - your first assumption is correct. I have only one queue (called foo). The strings a and b are indeed the group IDs.

manast commented 2 years ago

ah ok, you are right. I just could not get your code working as is, so I wrote this one which works as expected:

import { QueuePro, QueueScheduler, WorkerPro } from "@taskforcesh/bullmq-pro";

const queueName = "test";

const connection = { host: "localhost" };

const queue = new QueuePro(queueName, { connection });

// Not needed in this example.
const scheduler = new QueueScheduler(queueName, { connection });

async function enqueueJob(groupId, payload) {
  await queue.add(queueName, payload, { group: { id: groupId } });
}

const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

async function enqueueJobs() {
  for (let g = 0; g < 10; g++) {
    for (let i = 0; i < 10; i++) {
      await enqueueJob(g, { id: `${g}-${i}` });
    }
  }
}

async function run() {
  console.log("Adding jobs");
  await enqueueJobs();
  console.log("Jobs added");

  const worker = new WorkerPro(
    queueName,
    async (job) => {
      console.log(job.data.id);
      await delay(1000);
    },
    { connection, concurrency: 3 }
  );
}

run();

Results:

Adding jobs
Jobs added
0-0
1-0
2-0
3-0
4-0
5-0
6-0
7-0
8-0
9-0
0-1
1-1
2-1
3-1
4-1
5-1
6-1
7-1
8-1
9-1
0-2
1-2
2-2
3-2
4-2
5-2
6-2
7-2
8-2
9-2
0-3
1-3
2-3
3-3
4-3
5-3
6-3
7-3
8-3
9-3
0-4
1-4
2-4
3-4
4-4
5-4
6-4
7-4
8-4
9-4
0-5
1-5
2-5
3-5
4-5
5-5
6-5
7-5
8-5
9-5
0-6
1-6
2-6
3-6
4-6
5-6
6-6
7-6
8-6
9-6
0-7
1-7
2-7
3-7
4-7
5-7
6-7
7-7
8-7
9-7
0-8
1-8
2-8
3-8
4-8
5-8
6-8
7-8
8-8
9-8
0-9
1-9
2-9
3-9
4-9
5-9
6-9
7-9
8-9
9-9
hardcodet commented 2 years ago

A big difference I'm seeing is that you are creating your worker after you've enqueued your jobs. In my case (which probably is more real-life), the worker is being created up-front (part of the application bootstrapping), since the queue and worker are being setup to handle requests that will be incoming through API calls. Could that be the reason?

I changed my code to mimic your job enqueuing method, and as you can see, the beviour is completely different. Another observation: It took a long time until all jobs were enqueued (only after 2-5, and I also had that 1 second delay) - the fact that jobs were operating seems to have a massive impact on scheduling time. Here's my console output:

0-0
0-1
0-2
0-3
0-4
0-5
0-6
0-7
0-8
0-9
1-0
1-1
1-2
1-3
1-4
1-5
1-6
1-7
1-8
1-9
2-0
2-1
2-2
2-3
2-4
2-5
Jobs added
2-6
2-7
2-8
2-9
3-0
3-1
3-2
3-3
3-4
[.... rest omitted]

This was with a concurrency of 2. When raising it to 3, it took even longer until the enqueing loop completed:

3-7
3-8
Jobs added
3-9
4-0
hardcodet commented 2 years ago

If that helps, here's my full queue service that works on top of BullMQ. Will hopefully transpile out of the box after fixing the two TODOs:

import { JobExecutionOptions } from "./JobExecutionOptions";
import { Job, QueueScheduler } from "bullmq";
import { JobsProOptions, QueuePro, WorkerPro } from "@taskforcesh/bullmq-pro";
import IORedis = require("ioredis");

/**
 * Simplified job execution options.
 */
export interface JobExecutionOptions {
    /**
     * Maximum amount of retries until the job is regarded failed.
     */
    maxRetries?: number;

    /**
     * Linear retry backoff.
     */
    retryBackoff?: number;

    /**
     * A delay in ms until the job start getting executed.
     */
    delay?: number;

    /**
     * Repeat job in a given interval or according to a cron specification.
     */
    repeat?: CronRepeatOptions | EveryRepeatOptions;
}

export interface RepeatOptions {
    /**
     * Timezone
     */
    tz?: string;

    /**
     * End date when the repeat job should stop repeating
     */
    endDate?: Date | string | number;

    /**
     * Number of times the job should repeat at max.
     */
    limit?: number;
}

export interface EveryRepeatOptions extends RepeatOptions {
    /**
     * Repeat every millis (cron setting cannot be used together with this setting.)
     */
    every: number;
}

export interface CronRepeatOptions extends RepeatOptions {
    /**
     * Cron pattern specifying when the job should execute
     */
    cron: string;

    /**
     * Start date when the repeat job should start repeating (only with cron).
     */
    startDate?: Date | string | number;
}

interface QueueSetup {
    queue: QueuePro;
    scheduler: QueueScheduler;
    worker: WorkerPro;
}

export class QueueService {

    private queues: QueueSetup[] = [];

    /**
     * Creates a queue for a given type of jobs, and registers a callback
     * handler for it. This will also start receiving pending jobs if the queue
     * already exists in the data store.
     */
    public createQueue<T>(queueName: string, callback: (item: T, job: Job<T>) => Promise<void>) {

        // check if we already have that queue - don't allow duplicate registrations
        if (this.getQueue<T>(queueName)) {
            // this.logger.fatal(`Multiple queue creation requests for queue with name ${queueName} submitted.`);
            throw new Error(`Queue ${queueName} already exists.`);
        }

        // create a new queue and cache it
        const connection = new IORedis(this.config.redisUri);  // TODO setup
        const queue = new QueuePro<T>(queueName, { connection });
        const scheduler = new QueueScheduler(queueName, { connection });
        const worker = new WorkerPro(queueName, async (job: Job) => {
            // no try/catch needed - we're listening on the failed event of the worker
            await callback(job.data, job);
        }, { connection, concurrency: 3 });

        worker.on("failed", (job: Job, error: Error) => {
            const payload = {
                name: queueName + "-jobData",
                data: job.data,
            };

            if (job.attemptsMade < job.opts.attempts) {
                // this.logger.error(`Unexpected error on Bull job ${job.id} in queue ${queueName} (attempt ${job.attemptsMade} of ${job.opts.attempts}).`, error);
            } else {
                // this.logger.fatal(`Bull job ${job.id} in queue ${queueName} terminally failed after ${job.attemptsMade} retry attempts.`, error, payload);
            }
        });

        queue.on("error", (err: Error) => {
            // this.logger.fatal("Bull message queue error!", err);
        });

        this.queues.push({ queue, scheduler, worker });
        // this.logger.debug("Created Bull queue " + queueName);
    }

    /**
     * Enqueues a job with an optional delay or repeat pattern.
     * @param queueName The name of the queue - must match a previously registered one.
     * @param payload The payload that is injected into the worker function of the queue.
     * @param groupId An optional group ID: Jobs that share a given ID are guaranteed to execute sequentially.
     * @param options Additional job options.
     */
    public async enqueueJob<T>(queueName: string, payload: T, groupId?: string, options?: JobExecutionOptions): Promise<string> {
        const queue = this.getQueue<T>(queueName);

        if (!queue) {
            const msg = `Job request for unknown queue ${queueName} submitted - something's broken!`;
            // this.logger.fatal(msg);
            throw new Error(msg);
        }

        const jobId = `${queueName}_${Uuid.create()}`; // TODO replace UUID

        const jobOptions: JobsProOptions = {
            jobId,
            delay: options && options.delay,
            removeOnComplete: true,
            attempts: options && options.maxRetries || 10,
            backoff: options && options.retryBackoff || 5000,
            repeat: options && options.repeat,
        };

        if (groupId) {
            jobOptions.group = { id: groupId };
        }

        await queue.add(queueName, payload, jobOptions);
        return jobId;
    }

    private getQueue<T>(queueName: string): QueuePro {
        return this.queues.find(q => q.queue.name === queueName)?.queue;
    }
}
manast commented 2 years ago

Not sure I understand your issue actually. My test case waits for the jobs to be added so that we can easily see the round-robin behavior. If the jobs are coming randomly they are processed as they come and it is not until the queue starts accumulating jobs that you can see the round-robin behavior. For example code, it is better if you can provide the simplest code that reproduces the issue, all the rest that is not relevant just keeps in the way and makes it more difficult to understand.

hardcodet commented 2 years ago

I see your point, but I think your setup may be too simple to reveal the issues I'm, seeing. My expectations was that

In any case, I'll try to write a minimalistic retro skip later and post it here. Might take me until tomorrow, though.

manast commented 2 years ago

If I instantiate the worker before adding the jobs I still get the correct round-robin behavior. The order of the jobs is different though, it is still too early in the morning so I have not yet figured that out 😅.

UPDATE: ok, I figured it out, I was using a "dirty" queue with old jobs from a previous test run. As you can see it behaves correctly, don't you get the same results with my example code?

import { QueuePro, QueueScheduler, WorkerPro } from "@taskforcesh/bullmq-pro";

const queueName = "test";

const connection = { host: "localhost" };

const queue = new QueuePro(queueName, { connection });
const scheduler = new QueueScheduler(queueName, { connection });

async function enqueueJob(groupId, payload) {
  await queue.add(queueName, payload, { group: { id: groupId } });
}

const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

async function enqueueJobs() {
  for (let g = 0; g < 10; g++) {
    for (let i = 0; i < 10; i++) {
      await enqueueJob(g, { id: `${g}-${i}` });
    }
  }
}

const worker = new WorkerPro(
  queueName,
  async (job) => {
    console.log(job.data.id);
    await delay(1000);
  },
  { connection, concurrency: 3 }
);

async function run() {
  console.log("Adding jobs");
  await enqueueJobs();
  console.log("Jobs added");
}

run();

Gives:

Adding jobs
0-0
0-1
0-2
Jobs added
0-3
1-0
2-0
3-0
4-0
5-0
6-0
7-0
8-0
9-0
0-4
1-1
2-1
3-1
4-1
5-1
6-1
7-1
8-1
9-1
0-5
1-2
2-2
3-2
4-2
5-2
6-2
7-2
8-2
9-2
0-6
1-3
2-3
3-3
4-3
5-3
6-3
7-3
8-3
9-3
0-7
1-4
2-4
3-4
4-4
5-4
6-4
7-4
8-4
9-4
0-8
1-5
2-5
3-5
4-5
5-5
6-5
7-5
...
hardcodet commented 2 years ago

I adjusted your example a little to add some randomness into the delays and I think I got it to break. Some observations:

image

import { QueuePro, QueueScheduler, WorkerPro } from "@taskforcesh/bullmq-pro";

const queueName = "pro-test";

const connection = { host: "localhost" };

const queue = new QueuePro(queueName, { connection });

// Not needed in this example.
const scheduler = new QueueScheduler(queueName, { connection });

async function enqueueJob(groupId, payload) {
    await queue.add(queueName, payload, { group: { id: groupId } });
}

const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

async function enqueueJobs() {
    for (let g = 0; g < 3; g++) {
        for (let i = 0; i < 10; i++) {
            await enqueueJob(g, { group: g, id: `g${g}-#${i}` });
        }
    }
}

export async function runQueueTest() {
    console.log(new Date().toISOString() + ": Adding jobs");
    await enqueueJobs();
    console.log(new Date().toISOString() + ": Jobs added");

    const tracker = new Map<number, string>();

    const worker = new WorkerPro(
        queueName,
        async (job) => {
            const id = job.data.id;
            const group = job.data.group;

            console.log(new Date().toISOString() + ": enter " + job.data.id);
            if (tracker.get(group)) {
                console.warn(`${new Date().toISOString()}: Group ${group} currently runs ${tracker.get(group)}`);
            }
            tracker.set(group, id);
            await delay(Math.random() * 3000);
            console.log(new Date().toISOString() + ": exit " + job.data.id);
            tracker.set(group, null);
        },
        { connection, concurrency: 5 },
    );
}

Output from this test:

2022-02-17T10:45:05.802Z: Adding jobs
2022-02-17T10:45:10.110Z: Jobs added
BullMQ: DEPRECATION WARNING! Your redis options maxRetriesPerRequest must be null and enableReadyCheck false. On the next versions having this settings will throw an exception
BullMQ: DEPRECATION WARNING! Your redis options maxRetriesPerRequest must be null and enableReadyCheck false. On the next versions having this settings will throw an exception
2022-02-17T10:45:10.539Z: enter g0-#0
2022-02-17T10:45:10.683Z: enter g1-#0
2022-02-17T10:45:10.829Z: enter g2-#0
2022-02-17T10:45:10.972Z: enter g0-#1
2022-02-17T10:45:10.972Z: Group 0 currently runs g0-#0
2022-02-17T10:45:11.077Z: exit g2-#0
2022-02-17T10:45:11.115Z: enter g1-#1
2022-02-17T10:45:11.115Z: Group 1 currently runs g1-#0
2022-02-17T10:45:11.220Z: enter g2-#1
2022-02-17T10:45:11.540Z: exit g1-#0
2022-02-17T10:45:11.681Z: enter g0-#2
2022-02-17T10:45:11.681Z: Group 0 currently runs g0-#1
2022-02-17T10:45:12.704Z: exit g1-#1
2022-02-17T10:45:12.849Z: enter g1-#2
2022-02-17T10:45:13.030Z: exit g0-#0
2022-02-17T10:45:13.173Z: enter g2-#2
2022-02-17T10:45:13.174Z: Group 2 currently runs g2-#1
2022-02-17T10:45:13.575Z: exit g2-#1
2022-02-17T10:45:13.719Z: enter g0-#3
2022-02-17T10:45:13.885Z: exit g0-#1
2022-02-17T10:45:14.028Z: enter g1-#3
2022-02-17T10:45:14.028Z: Group 1 currently runs g1-#2
2022-02-17T10:45:14.597Z: exit g0-#2
2022-02-17T10:45:14.644Z: exit g2-#2
2022-02-17T10:45:14.741Z: enter g2-#3
2022-02-17T10:45:14.786Z: enter g0-#4
2022-02-17T10:45:15.465Z: exit g2-#3
2022-02-17T10:45:15.541Z: exit g1-#2
2022-02-17T10:45:15.610Z: enter g1-#4
2022-02-17T10:45:15.685Z: enter g2-#4
2022-02-17T10:45:16.117Z: exit g0-#4
2022-02-17T10:45:16.262Z: enter g0-#5
2022-02-17T10:45:16.394Z: exit g1-#4
2022-02-17T10:45:16.536Z: exit g2-#4
2022-02-17T10:45:16.537Z: enter g1-#5
2022-02-17T10:45:16.564Z: exit g0-#3
2022-02-17T10:45:16.680Z: enter g2-#5
2022-02-17T10:45:16.705Z: enter g0-#6
2022-02-17T10:45:16.720Z: exit g2-#5
2022-02-17T10:45:16.868Z: enter g1-#6
2022-02-17T10:45:16.868Z: Group 1 currently runs g1-#5
2022-02-17T10:45:16.968Z: exit g1-#3
2022-02-17T10:45:17.109Z: enter g2-#6
2022-02-17T10:45:17.139Z: exit g1-#6
2022-02-17T10:45:17.282Z: enter g0-#7
2022-02-17T10:45:17.282Z: Group 0 currently runs g0-#6
2022-02-17T10:45:17.788Z: exit g0-#6
2022-02-17T10:45:17.926Z: exit g1-#5
2022-02-17T10:45:17.931Z: enter g1-#7
2022-02-17T10:45:18.345Z: enter g0-#8
2022-02-17T10:45:19.139Z: enter g2-#8
2022-02-17T10:45:19.411Z: exit g2-#6
2022-02-17T10:45:19.633Z: enter g1-#9
2022-02-17T10:45:19.633Z: Group 1 currently runs g1-#8
2022-02-17T10:45:19.721Z: exit g0-#8
2022-02-17T10:45:19.862Z: enter g2-#9
2022-02-17T10:45:20.000Z: exit g2-#8
2022-02-17T10:45:20.683Z: exit g1-#9
2022-02-17T10:45:20.777Z: exit g2-#9
2022-02-17T10:45:20.853Z: exit g1-#8
2022-02-17T10:45:21.754Z: exit g0-#9
manast commented 2 years ago

I wonder, what Redis instance are you using for these tests? Is it a local instance? and which version?

I cannot reproduce anything so far, it adds the jobs within a second, and even with 50 as concurrency the jobs are all processed in correct order for me.

hardcodet commented 2 years ago

I'm using hosted redis for local development, maybe that one is throttled a little (using the free offering). That still wouldn't explain the different behavior though... I sent you my connection string directly for a repro - let's rule first out that it's really the redis version.

billoneil commented 4 months ago

It sounds like setting a group concurrency of 1 would solve this correct? https://docs.bullmq.io/bullmq-pro/groups/concurrency