cody-greene / node-rabbitmq-client

RabbitMQ (0-9-1) client library with auto-reconnect & zero dependencies
MIT License
130 stars 9 forks source link

Consumer handler `reply()` `envelope.routingKey` and `envelope.exchange` conflict #42

Closed MarwanAlsoltany closed 2 months ago

MarwanAlsoltany commented 9 months ago

Problem

The behavior of the reply() callback passed to the ConsumerHandler has changed between v4.4.0 and v4.5.0. A change was introduced to allow for overriding message envelope properties (correlationId to be specific). Which resulted in message replies being routed wrongfully due to using both the routingKey and the exchange for routing the message (as with any normal publisher).

Cause

The code responsible for this issue can be found here v4.4.0 - v4.5.0 Diff

Diagnoses

In RabbitMQ, a message can be published to queue either through an exchange that is bound to a queue using some routingKey. Or a message with routingKey that matches a queue name exactly.

Previously, the exchange was set to an empty string (always overwritten explicitly), which made the routingKey the only property responsible for routing the message. Starting from v4.5.0, this property assignment was dropped out, which resulted in the following side-effect:

Fix

Revert the change made in v4.5.0 and add { exchange: '' } again so that the { routingKey: req.replyTo } is the only field used for routing.

If you don't have the time to fix the issue. I am more than happy to make a PR.


BTW, thanks for the package, I really enjoy working with it. You nailed the API design.

cody-greene commented 9 months ago

While direct reply is the default as a conterpart to the RPC interface, it is conceivable that somebody may want to use a fanout exchange to send a response to multiple interested queues. Or use other exchange types for whatever reason.

For example, here is a weird little worker which broadcasts replies through a fanout exchange:

const sub = rabbit.createConsumer({
  queue: 'broadcaster'
}, async (msg, reply) => {
  // ... do something ...
  await reply('hello', {exchange: msg.headers['x-reply-to-exchange']})
})

const pub = rabbit.createPublisher({
  confirm: true,
  exchanges: [{exchange: 'splitter', type: 'fanout'}],
})

await pub.send({
  routingKey: 'broadcaster', 
  replyTo: 'whatever', 
  headers: {'x-reply-to-exchange': 'splitter'}
}, 'hello')

I must admit I'm struggling to come up with a practical use for this, but just the same I see no reason to consider this a bug in need of fixing.

MarwanAlsoltany commented 9 months ago

@cody-greene It is not necessarily a bug, but quite a niche need to break the API because of it.

The way I see it, if the need arises for the functionality you mentioned, then the reply() callback is not the right way to do it. The consumer should construct and use its own publisher instead.

FYI, the only reason I opened this issue is that this change broke the API by changing its behavior. The reply() should be documented better to outline which fields can be overwritten. I do not personally have an issue with the change, but it should've been at least tagged as v5.0.0.

Feel free to close the issue.

Have a nice day!

cody-greene commented 9 months ago

I agree it's better to use a separate publisher. Removing this now would definitely be a breaking change though. I'll add it to my list for 5.0