Closed quan612 closed 11 months ago
I was trying the fanout. But I may have misconfigured, currently only 1 node get the expected message.
Its on 7.2.3
Here are some part of code:
class TestSmq {
private smqQueue
private smqFanoutExchange
private smqProducer
private smqConsumer
constructor() {
this.smqQueue = promisifyAll(QueueManager)
this.smqFanoutExchange = promisifyAll(FanOutExchangeManager)
this.smqProducer = promisifyAll(new Producer(config))
this.smqConsumer = promisifyAll(new Consumer(config))
this.createQueue()
}
createQueue = async () => {
const queueManagerAsync = promisifyAll(await this.smqQueue.createInstanceAsync(config))
const queueAsync = promisifyAll(queueManagerAsync.queue)
// Before producing and consuming messages to/from a given queue, we need to make sure that such queue exists
const exists = await queueAsync.existsAsync('test-smq')
if (!exists) {
// Creating a queue (a LIFO queue)
await queueAsync.saveAsync('test-smq', EQueueType.FIFO_QUEUE)
}
}
produceFanout = async (message: string) => {
await this.smqProducer.runAsync()
const msg = new Message() //ProducibleMessage()
const exchange = new FanOutExchange('test-smq')
const fanoutManagerAsync = promisifyAll(
await this.smqFanoutExchange.createInstanceAsync(config),
)
await promisifyAll(fanoutManagerAsync.bindQueueAsync('test-smq', exchange))
msg.setExchange(exchange).setBody(message)
await this.smqProducer.produceAsync(msg)
}
consumerFanout = async () => {
// starting the consumer and then registering a message handler
await this.smqConsumer.consumeAsync(
'test-smq',
(message: any, cb: any) => {
console.log(`*****************************Got message to consume: `)
console.log(message?.body)
cb()
},
)
await this.smqConsumer.runAsync()
}
}
So i have this exact same code running on 2 servers.
Either 1 of them will get a message from somewhere else then will call this:
await SmqService.produceFanout("A Test Message")
But in the end: it only logs the message once.
May I know how to have the message returned by both servers.
+++++++++++++++++++++++++++++ On debug log: I got this message in one of the instance
In previous code snippet I was using the same queue name. If I try to assign a different queue name per app / node server then I can see the message is broadcasted to both.
Not sure if it is the right approach, since I generate a random id for queue name, so per deployment need to clean up the old ones
@quan612 Thank you for opening this issue.
Exchanges in RedisSMQ are a way to publish a message to one or many queues. Consumers on the other side do not know about the existence of such thing as an exchange.
Direct exchanges
A direct exchange allow to publish a message to one queue which is identified by its parameters (name/ns).
message (1) ──────► queue2 (2.2) ──────► consumer2 (3.2)
(1): The queue where the message is going to be published is set using message.setQueue('a') (2): Before publishing a message, we need to create a queue using queue.save('a', EQueueType.LIFO_QUEUE, cb) (3): to consume a message from a queue the consumer instance uses consumer.consume('a', messageHandler, cb)
Topic exchanges
A topic exchange allow to publish a message to many queues based on a topic pattern.
┌──────► queue1 (2.1) ──────► consumer1 (3.1)
│
│
message (1) ────►├──────► queue2 (2.2) ──────► consumer2 (3.2)
│
│
└──────► queue3 (2.3) ──────► consumer3 (3.3)
(1): The message is going to be published based on a topic pattern for example a.b
using message.setTopic('a.b')
(2): Queues are created as usually using queue.save(). All the queue must match the pattern a.b
:
(3): Each queue as usually as one or many consumers:
Fan Out exchanges
A FanOut exchange allow to publish a message to many queues which are bound to the same FanOut exchange. Here we are not talking about message delivery. Please pay attention that we are still in the same concept as before: publishing a message to one or many queues.
So FanOut exchanges are used in the context of publishing and not delivering a message.
┌──────► queue1 (3.1) ──────► consumer1 (4.1)
│
│
message (1) ────► FanOut1 (2) ────►├──────► queue2 (3.2) ──────► consumer2 (4.2)
│
│
└──────► queue3 (3.3) ──────► consumer3 (4.3)
(1): The message is going to be published based on a FanOut pattern for example my-fanout
which is set using message.setFanOut('my-fanout')
(2): Before publishing the message the FanOut exchange my-fanout
must be first created.
(3): Queues are created as usually. Once created the given queues must be bound to the FanOut exchange my-fanout
which has been previously saved.
(4): Consumers as usually consume messages from the listed queues.
Differences between Topic and FanOut exchanges
The FanOut exchanges are mainly are the same as Topic Exchanges. But the only difference between them is that Topic Exchanges allow to publish dynamically a message to many queues based on a pattern
. Whereas FanOut exchanges allow to publish a messages only to those queues which have been bound beforehand.
@quan612 So in your case you either go with Topic Exchanges or FanOut Exchanges.
If you go with topics:
1\ Each app
use its own queue for example app1 uses my-queue.app1
, app2 uses my-queue.app2
, and so on.
2\ Publish a message with the topic my-queue
(message.setTopic('my-queue')).
If you go with fan-outs:
1\ Each app
use its own queue for example app1 uses my-app1-queue
, app2 uses my-app2-queue
, and so on.
2\ Create a FanOut my-fanout
3\ Bind both my-app1-queue
and my-app2-queue
to my-fanout
FanOut
4\ Publish a message with the my-fanout
FanOut (message.setFanOut('my-fanout')).
While answering your question I got inspired by an idea.
As I explained, currently exchanges are applied only when publishing a message.
RedisSMQ v8 is in active development and I may consider refactoring exchanges in order to apply them both when publishing and delivering messages.
In other words, we may set up RedisSMQ to deliver a message to all consumers which are consuming messages from a given queue.
It sounds like a nice feature to have! Keeping this issue open until further updates. But for now, the way the exchanges work is more than enough in most situations.
In other words, we may set up RedisSMQ to deliver a message to all consumers which are consuming messages from a given queue.
After a short evaluation I have came to the conclusion that this feature has nothing to do with exchanges and should be considered separately and without any relation to exchanges.
In fact this feature is purely pub/sub pattern but with persistence. An new issue for this feature has been created.
thanks @weyoss for a very details explanation.
I got it works using assigning a different queue name per consumer, and having the producer runs on a different machine (its not necessary but easier to debug)
I misunderstood the configuration previously due to coming from a config of RabbitMQ I worked on several years ago. In that particular Fan Out of RabbitMQ configuration, we were able to spin up a new consumer machine "blindly" without having to give a unique identifier to an consumer (queue name), whereas in here we have to.
Sorry for any misunderstanding. Thanks again.
Hi,
Thanks for developing this library. Just wondering if I can confirm this use case.
I have multiple node apps running that I need to consume the same message published from somewhere else. Something like this (like app 1 received a message from another service and need to "broadcast" to itself and app 2, or vice versa)
app (1) / (2) running behind a load balancer of some sort, and when either (1) or (2) received a message ~ current from AWS SNS, I need to fan out from app 1 to itself, and app 2, or 3 and 4, so on if we need more.....
So I thought that I can have the redis-smq running on both app 1 and 2, having setup the same fanout exchange and then just fan out produce this message, thus both (1) and (2) now can receive the original message from external app.
Another way I knew of is using nature pub sub from redis, but I just hope to use a message broker here so that I can check later if an event is missing or something....