weyoss / redis-smq

A simple high-performance Redis message queue for Node.js.
MIT License
588 stars 64 forks source link

Consuming messages from a priority queue #71

Closed PhantomRay closed 2 years ago

PhantomRay commented 2 years ago

Hi there

const { Consumer, Message, Producer } = require('redis-smq');

const consumer = new Consumer();

const messageHandler = (msg, cb) => {
  const payload = msg.getBody();
  console.log('handler1: ', payload);
  cb(); // acknowledging the message
};

// the second parameter is for enabling priority queuing
consumer.consume('queue1', false, messageHandler, (err, isRunning) => {
  if (err) console.error(err);
  // the message handler will be started only if the consumer is running
  else {
    console.log(
      `handler1 has been registered. Running status: ${isRunning}`,
    );
  } // isRunning === false
});

const anotherMessageHandler = (msg, cb) => {
  const payload = msg.getBody();
  console.log('handler2: ', payload);
  // ...
  cb();
};

consumer.consume(
  'queue2',
  false,
  anotherMessageHandler,
  (err, isRunning) => {
    if (err) console.error(err);
    // the message handler will be started only if the consumer is running
    else {
      console.log(
        `handler2 has been registered. Running status: ${isRunning}`,
      );
    }
  },
);

consumer.run();

// -------------------------------------------------

const message1 = new Message();
message1
  .setBody({ hello: 'world1' })
  .setTTL(3600000) // in millis
  .setQueue('queue1');

const message2 = new Message();
message2
  .setBody({ hello: 'world2' })
  .setTTL(3600000) // in millis
  .setQueue('queue2');

const producer = new Producer();
producer.produce(message1, (err) => {
  if (err) console.log(err);
  else {
    const msgId = message1.getId(); // string
    console.log('Successfully produced. Message ID is ', msgId);
  }
});

producer.produce(message2, (err) => {
  if (err) console.log(err);
  else {
    const msgId = message2.getId(); // string
    console.log('Successfully produced. Message ID is ', msgId);
  }
});

For the code above, both queues are not priority queue. The output will be always:

handler1 has been registered. Running status: false
handler2 has been registered. Running status: false
Successfully produced. Message ID is  8ce2fca4-5e41-49cb-9de6-cd3eedcbaad0
Successfully produced. Message ID is  69b7f2e1-a532-4823-a4b4-88f1a42102e5
handler1:  { hello: 'world1' }
handler2:  { hello: 'world2' }

If I change the the second one to priority queue consumer.consume('queue2', true, anotherMessageHandler,..., I run it for a few times, I always get this result:

handler1 has been registered. Running status: false
handler2 has been registered. Running status: false
Successfully produced. Message ID is  736395df-bd62-4762-8176-5fb96d857cbc
Successfully produced. Message ID is  ce50472d-2926-4f25-9d7a-bd19ed6b5e52
handler1:  { hello: 'world1' }

If I set priority back to false for the second queue, I got this output:

handler1 has been registered. Running status: false
handler2 has been registered. Running status: false
Successfully produced. Message ID is  1f14a3f4-71ee-4eec-8d4b-938b0883c4a9
Successfully produced. Message ID is  c8f6e2c7-e497-4902-88d5-78f38c7d2ab3
handler1:  { hello: 'world1' }
handler2:  { hello: 'world2' }
handler2:  { hello: 'world2' }
handler2:  { hello: 'world2' }
handler2:  { hello: 'world2' }
handler2:  { hello: 'world2' }
handler2:  { hello: 'world2' }
handler2:  { hello: 'world2' }
handler2:  { hello: 'world2' }

My question is, when I use consumer.consume('queue2', true, anotherMessageHandler,..., why couldn't I consume the messages from 2nd queue?

weyoss commented 2 years ago

If I change the the second one to priority queue consumer.consume('queue2', true, anotherMessageHandler,..., I run it for a few times, I always get this result:

You have enabled priority queuing for the message handler on queue2. Great! But no messages with priority has been produced.

PhantomRay commented 2 years ago

I always thought by enabling that, messages will pop out from the queue base on their priority. Messages with the same priority will pop out LIFO.

But why does the queue hold the messages? Can you please explain?

weyoss commented 2 years ago

I always thought by enabling that, messages will pop out from the queue base on their priority.

Based on what priority? Have you set the priority for the message? Please read carefully the docs.

To set up a priority for a message you use Message.prototype.setPriority()

From that point the message is enqueued to a priority queue. Messages without priority can not be consumed with a priority.

weyoss commented 2 years ago

See https://github.com/weyoss/redis-smq/blob/master/docs/priority-queues.md

PhantomRay commented 2 years ago

I misunderstood

weyoss commented 2 years ago

I'm glad you figure it out

weyoss commented 2 years ago

Closing as resolved.