weyoss / redis-smq

A simple high-performance Redis message queue for Node.js.
MIT License
588 stars 64 forks source link

Checking if a message handler exists #75

Closed PhantomRay closed 2 years ago

PhantomRay commented 2 years ago

Hi there My consumer may have many message handlers for different queues. By setting up new message handlers, there might be errors because the handlers already exist.

So I want to check if a message handler already exists before adding a new one. Is it possible to expose getMessageHandler or there is a better approach?

Plus, if I have multiple instances of the same app which use consumers, they actually won't know each other's messageHandler? In that case, is it possible to find out any consumer is consuming a particular queue (and ns)?

weyoss commented 2 years ago

Thank you for opening this issue.

The Node.js way of dealing with errors is the best way to know if a message handler exists or has been registered successfully.

For simplicity, the Consumer provides only ONE method for registering message handlers: consume(). The method getMessageHandler() is called under the hood for checking if there a message handler for the given queue, and returns an error if it exists. For your application it would be redundant to use thegetMessageHandler() method, which is an internal method.

As an improvement, a MessageHandlerAlreadyExistsError error should be returned instead of a plain error. So in your application you would check it like:

consumer.consume(queue, usePriorityQueuing, messageHandler, (err) => {
   if (err) {
     if (err instanceof MessageHandlerAlreadyExistsError) {
        // A message handler already exists
       // ...
     } else {
       // An error has occurred while running your message handler
      // ...
     }
   }
})
weyoss commented 2 years ago

Plus, if I have multiple instances of the same app which use consumers, they actually won't know each other's messageHandler?

Consumers are run independently of each other. Message handlers are registered per consumer/queue/priority.

In that case, is it possible to find out any consumer is consuming a particular queue (and ns)?

consumer.getQueues() allows you to retrieve the queue list (with namespaces) from which the consumer instance is consuming messages. See getQueues().

PhantomRay commented 2 years ago

All noted. Thanks. I'm currently try to extend ConsumerFrontend and have to access private consumer. Is it possible to add a function to access it? I want to do this because I would like to force to consume if a message handler already exists. Basically cancel the existing one first.

There are many other scenarios I may need to access this.consumer directly.

weyoss commented 2 years ago

You are doing something wrong and you should have no need to access the private property consumer.

I'm currently try to extend ConsumerFrontend and have to access private consumer. Is it possible to add a function to access it?

No.

I want to do this because I would like to force to consume if a message handler already exists. Basically cancel the existing one first.

Sounds to me like you have a bad pattern in your implementation. That is not a clean path to take. I remember last time, you wanted to create a new consumer instance per each websocket connection which sounded crazy to me :)

 consumer.consume(queue, usePriorityQueuing, messageHandler, (err) => {
   if (err) {
     if (err instanceof MessageHandlerAlreadyExistsError) {
        // CANCEL IT HERE.
     } else {
       // ...
     }
   }
})

There are many other scenarios I may need to access this.consumer directly.

You should have no need to directly access a private property. That is why it is a private property.

Please use RedisSMQ as it is intended to be used and do not violate the public contract the library provides. Otherwise no support for misuse will be provided.

Thank you.

PhantomRay commented 2 years ago

Last time when I used consumer per websocket connection was because I didn't know each consumer uses one redis connection. That's why I mentioned redis connection pooling.

To improve that I want to use only one consumer per service (one redis connection), to be able to handle multiple websocket connections (by using one consumer, multiple message handlers, thanks to your recommendation earlier).

There might be cases that a websocket connection tries to get messages from the same queue while there is another active websocket connection already consuming the same queue.

To avoid a consumer tries to add the same message handler twice, I will have to cancel the existing one, and then consume. I don't like to put a consume inside another consume. So I ended up wrap redis-smq's Consumer inside a customer consumer (instead of extending the class), then use getQueues to check the message handler's existence.

weyoss commented 2 years ago

Added MessageHandlerAlreadyExistsError in v6.2.3.

Regarding message handler deletion, you can also make sure that no message handler exists before registering a new one:

const callback = (err) => {
   if (err) console.log(err);
   else consumer.consume(queue, usePriorityQueuing, messageHandler, (err) => {
       // ...
   });
}

// Does not return an error if the queue has no message handler (starting with redis-smq@6.2.3)
consumer.cancel(queue, usePriorityQueuing, callback);
weyoss commented 2 years ago

Closing as resolved.