weyoss / redis-smq

A simple high-performance Redis message queue for Node.js.
MIT License
588 stars 64 forks source link

Receive message on the other queue when restart #30

Closed mmsqe closed 5 years ago

mmsqe commented 5 years ago

Steps to reproduce:

  1. Send message to one Queue1
  2. Consumer1 crashes due to error
  3. Both Queue1 and Queue2 receive the message from Step1

ps: redis is running with aof

weyoss commented 5 years ago

Thank you for reporting this issue!

The cause of this bug has been identified (as being related to GC) and fixed.

A new release containing this hot-fix will be published today ASAP.

mmsqe commented 5 years ago

Thanks for the quick response, but issue still exists with commit https://github.com/weyoss/redis-smq/commit/64087f154ec40dbdc65e9fa72594970b02351f81

weyoss commented 5 years ago

I believe that the issue has been fixed.

I made a little script just to make sure that everything works as expected:

'use strict';

const { Consumer, Producer } = require('redis-smq');

function getConsumer(queueName, crashOnConsume = false) {
    const InstanceClass = class extends Consumer {
        consume(message, cb) {
            if (crashOnConsume) {
                console.log(`Got message: ${JSON.stringify(message)}, queue: ${this.constructor.queueName}. Simulating crash...`);
                console.log(`Please wait for another consumer to recover from this failure. It would take some time...`);
                this.stop();
            } else {
                console.log(`Got message ${JSON.stringify(message)} from Queue ${this.constructor.queueName}. Acknowledging...`);
                cb();
            }
        }
    };
    InstanceClass.queueName = queueName;
    return new InstanceClass();
}

function getProducer(queueName) {
    return new Producer(queueName);
}

// This creates a consumer which acknowledges all received messages on queue 'queue_b'
const consumerQueueB = getConsumer('queue_b');
consumerQueueB.run();

// Produce a message on queue 'queue_b'
const producerQueueB = getProducer('queue_b');
producerQueueB.produce({ id: 'b', hello: 'world' }, (err) => {
    if (err) {
        throw err;
    }
});

// This creates a consumer which "crashes" once a message is received
const consumer1QueueA = getConsumer('queue_a', true);

/**
 * Start an other consumer instance on queue 'queue_a'
 * just after the first consumer1QueueA crashed...
 *
 * Here we just listen to the 'halt' event which get emitted once consumer1QueueA goes down.
 */
consumer1QueueA.on('halt', () => {
    const consumer2QueueA = getConsumer('queue_a');
    consumer2QueueA.run();
});

// Start waiting for new messages...
consumer1QueueA.run();

// Produce a message on queue 'queue_a'
const producer1QueueA = getProducer('queue_a');
producer1QueueA.produce({ id: 'a', hello: 'world' }, (err) => {
    if (err) {
        throw err;
    }
});

The script output should be like:

Got message {"id":"b","hello":"world"} from Queue queue_b. Acknowledging...
Got message: {"id":"a","hello":"world"}, queue: queue_a. Simulating crash...
Please wait for another consumer to recover from this failure. It would take some time...
Got message {"id":"a","hello":"world"} from Queue queue_a. Acknowledging...

As you see from the output the message {"id":"a","hello":"world"} was not re-queued on queue_b after consumer crash.

weyoss commented 5 years ago

A new release (v1.0.22) is out.

mmsqe commented 5 years ago

Sorry that issue has been actually fixed, I didn't delete previous appendonly.aof file mounted from docker container to my host.