taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
6.18k stars 406 forks source link

[Bug]: Event missing during QueueEvent initializing #2874

Closed chungjung-d closed 2 weeks ago

chungjung-d commented 2 weeks ago

Version

v5.21.2

Platform

NodeJS

What happened?

For testing, I created queue, worker, and queueEvent objects, and used queueEvent.on to listen for events. However, I observed that about 1-2 initial events are lost in 3-4 out of 5 attempts. To address this issue, I tried using the queueEvents.waitUntilReady() function to delay until initialization, but it didn’t work as expected. Only by using delay functions like setTimeout could I get it to function properly.

After inspecting the code, I found that waitUntilReady() returns the RedisClient object before client.xread is executed in queueEvent. (The issue seems to arise because the ID is initially set to $ during initialization, but before this, a job is added, causing the latest event to update and trigger the problem.)

In the attached log below, you can see that the waiting event for Job1 is missing. Debugging also revealed that the added event was omitted.

To address this issue, I propose modifying the waitUntilReady() method in queueEvent. I believe this can be resolved by having waitUntilReady() fulfill the Promise only when xread is executed within queueEvent.

If possible, I would like to take on this issue and work on resolving it.

How to reproduce.

Run this script

import { Queue, QueueEvents, Worker } from "bullmq";

async function main() {

    const queue = new Queue('footprint', {
        connection: {
            host: 'localhost',
            port: 10001
        }
    });

    const queueEvents = new QueueEvents('footprint', {
        connection: {
            host: 'localhost',
            port: 10001
        }
    });

    const worker = new Worker('footprint', async (job) => {
        await job.updateProgress(42);

        console.log(`Processing job with id ${job.id}`);
        return { result: 'Success' };
    }, {
        connection: {
            host: 'localhost',
            port: 10001
        }
    });

    await Promise.all([
        queue.waitUntilReady(),
        queueEvents.waitUntilReady(),
        worker.waitUntilReady()
    ]);

    queueEvents.on("waiting", (jobId) => {
        console.log('QueueEvent: Job waiting', jobId);
    });

    queueEvents.on('completed', ({ jobId }) => {
        console.log('QueueEvent: Job completed', jobId);
    });

    queueEvents.on('drained', () => {
        console.log('QueueEvent: Queue drained');
    });

    queueEvents.on('progress', ({ jobId, data }) => {
        console.log('QueueEvent: Job progress', jobId, data);
    });

    for (let i = 0; i < 2; i++) {
        await queue.add('test', { foo: 'bar' });
    }

    await new Promise(() => {});
}

main().catch((error) => console.error(error));

Relevant log output

QueueEvent: Job waiting {
  jobId: "2",
}
Processing job with id 1
QueueEvent: Job progress 1 42
QueueEvent: Job completed 1
Processing job with id 2
QueueEvent: Job progress 2 42
QueueEvent: Job completed 2
QueueEvent: Queue drained

Code of Conduct

manast commented 2 weeks ago

What is happening in this case is that although you are waiting for the Redis connection to be stablished for the QueueEvents instance, the XREAD command has not been issued yet while you have managed to add some jobs to the queue. Although your case demonstrates a weird behaviour in a synthetic scenario, it is important to understand that at the end of the day this is a distributed system, and some things are just not going to work as you expect. For instance, if you were running QueueEvents in a different process or machine altogether (which is the actual use case for QueueEvents), how would you guarantee that the call to XREAD happens before the calls to Queue for adding jobs? This is simply not possible. In fact, "waitUntilReady" is a helper function for running our test suite, it is not really useful in practice. If you want to have a steady stream of events without loosing any, you could try using the lastId option, start with "0-0" which is the actual start of the stream (instead of default $ which is the last item in the stream), and store the lastId in some persistent storage. It is quite difficult to provide better guarantees than this, QueueEvents is supposed to be used mostly for debugging or when it is acceptable to loose some events in certain edge cases, for better guarantees you have the Queue and Worker classes. I am closing this issue now as there is nothing else that can be done here unfortunately.