taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
6.16k stars 405 forks source link

[Bug]: Excessive Redis Connections and waitUntilFinished Timeout in High-Load Environments #2749

Open dbas-dn opened 2 months ago

dbas-dn commented 2 months ago

Version

v5.12.12

Platform

NodeJS

What happened?

Environment:

Kubernetes: k3s with 3 nodes Redis: Sentinel configuration with 3 nodes Description:

Our system is designed to create a large number of queues. We noticed that while it's possible to pass an already established Redis connection for creating Queue, Job, and Worker instances, the QueueEvents class duplicates the passed connection using duplicate().

During our load tests, where we ran between 100-600 queues, each queue created jobs that returned results. We observed that under low load, waitUntilFinished worked as expected, returning the job result. However, under high load conditions, waitUntilFinished failed to return, hanging indefinitely until the TTL expired.

To debug this, I implemented a parallel polling mechanism that ran the scripts.isFinished script, which indicated that the job had indeed reached the completed status and I could retrieve its result, even though waitUntilFinished remained stuck.

Additionally, the excessive number of connections created by QueueEvents due to the duplicate() method led to us exceeding the maximum number of Redis connections, especially under high concurrency.

Workaround:

As a temporary workaround, I replaced waitUntilFinished with periodic execution of scripts.isFinished to check job completion.

Issue Summary:

waitUntilFinished does not return in high-load scenarios, even when the job is completed according to scripts.isFinished. QueueEvents creates redundant connections by duplicating the Redis connection, which leads to exceeding the maximum number of connections under high concurrency.

How to reproduce.

import { Queue, Worker, QueueEvents, Job } from 'bullmq';
import Redis, { RedisOptions } from 'ioredis';
import pQueue from 'p-queue';
import { setTimeout as sleep } from 'timers/promises';

jest.setTimeout(60000); // Set a longer timeout for high-load testing

describe('BullMQ waitUntilFinished bug reproduction', () => {
  let connectionOptions: RedisOptions;
  let queues: Queue[] = [];
  let workers: Worker[] = [];

  beforeAll(async () => {
    connectionOptions = {
      host: 'localhost', // Replace with your Redis host
      port: 6379,        // Replace with your Redis port
      password: '---', // Replace with your Redis password if any
      maxRetriesPerRequest: null,
      // Add any other Redis options here
    };
  });

  afterAll(async () => {
    await Promise.all([
      ...queues.map(queue => queue.obliterate({ force: true })),
      ...workers.map(worker => worker.close()),
    ]);
  });

  it('should reproduce the waitUntilFinished hang under high load', async () => {
    const concurrency = 20;
    const queueCount = 600;
    const jobCount = 1200;
    const connection = new Redis(connectionOptions);
    // Create multiple queues
    for (let i = 0; i < queueCount; i++) {
      const queue = new Queue(`test-queue-${i}`, { connection });
      const worker = new Worker(
        `test-queue-${i}`,
        async (job: Job) => {
          await sleep(Math.random() * 1000);
          // Simulate some work
          return `Result of ${job.name}`;
        },
        { connection }
      );
      queues.push(queue);
      workers.push(worker);
      await sleep(50);
    }

    const queue1 = new pQueue({ concurrency, autoStart: true });
    const queue2 = new pQueue({ concurrency, autoStart: true });

    // Add jobs to the queues
    const results = [];
    for (let i = 0; i < jobCount; i++) {
      void queue1.add(async () => {
        const queueIndex = i % queueCount;
        const queueEvent = new QueueEvents(`test-queue-${queueIndex}`, { connection });
        const job = await queues[queueIndex].add(`job-${i}`, { foo: 'bar' });
        try {
          const result = await job.waitUntilFinished(queueEvent, 10_000);
          await queueEvent.close();
          results.push(result);
        } catch (e) {
          job['scripts'].isFinished(job.id, true).then(v => {
            console.error(e);
            console.log(v);
          });
        }
      });
    }
    for (let i = 0; i < jobCount; i++) {
      void queue2.add(async () => {
        try {
          const queueIndex = i % queueCount;
          await queues[queueIndex].add(`job-${i}`, { foo: 'bar' });
        } catch (e) {
          console.error(e);
        }
      });
    }
    await Promise.all([queue1.onIdle(), queue2.onIdle()]);
    // Check if all jobs have results (if not, it indicates an issue)
    expect(results.length).toBe(jobCount);
  });
});

Relevant log output

  console.error
    Error: Job wait job-0 timed out before finishing, no finish notification arrived after 10000ms (id=1)
        at onFailed (/node_modules/.pnpm/bullmq@5.12.3/node_modules/bullmq/src/classes/job.ts:1051:16)
        at /node_modules/.pnpm/bullmq@5.12.3/node_modules/bullmq/src/classes/job.ts:1035:13
        at Timeout.task (/node_modules/.pnpm/jsdom@20.0.3/node_modules/jsdom/lib/jsdom/browser/Window.js:520:19)
        at listOnTimeout (node:internal/timers:573:17)
        at processTimers (node:internal/timers:514:7)

      66 |         } catch (e) {
      67 |           job['scripts'].isFinished(job.id, true).then(v => {
    > 68 |             console.error(e);
         |                     ^
      69 |             console.log(v);
      70 |           });
      71 |         }

      at src/_tests_/bullmq-high-load.spec.ts:68:21

  console.log
    [ 1, '"Result of job-0"' ]

      at src/_tests_/bullmq-high-load.spec.ts:69:21

Code of Conduct

dbas-dn commented 1 month ago

any reaction?

manast commented 1 month ago

waitUntilFinished is not a recommended api to use in production, it does not scale and is not a proper way to design an architecture based on queues: https://blog.taskforce.sh/do-not-wait-for-your-jobs-to-complete/

daimalou commented 1 month ago

@manast I am facing the same. I am developing a realtime AI and Image convertion tool. The user use Server Sent Event to push some image to my server, and i push to queue to process using third api(it has QPS) to convert and return to user until the job finished.

I think the waitUntilFinished is very useful when using Server Sent Event.

manast commented 1 month ago

@daimalou it may be useful but it is not the proper way to use queues. As I see it, you would be better off just spawning a NodeJS worker thread, run the job and wait it for completion, than using a queue.

manast commented 1 month ago

@daimalou btw, SSE are used for sending data from the server to the client, not sure what you mean that you use it for sending images to the server :/. In any case, if you use SSE or web sockets, it does not matter, you can easily communicate to the client when a job has completed without relying on waitUntilFinished, you may need to redesign your solution a bit, but thats the proper way to do it and have a scalable and issue free system that run stable for a long time.

daimalou commented 1 month ago

@manast yes, my description was a bit unclear. i use https://github.com/Azure/fetch-event-source, it can post some date to server and server response data using SSE. Thank you. I think what you said is correct. I need to create more logic myself.