cody-greene / node-rabbitmq-client

RabbitMQ (0-9-1) client library with auto-reconnect & zero dependencies
MIT License
116 stars 8 forks source link

Publish doesn't await consumer ack #11

Closed Gielert closed 1 year ago

Gielert commented 1 year ago

First of all, thanks for the library! Though I'm having some trouble with awaiting acknowledgements.

I have a consumer:

 const consumer = client.createConsumer(
   {
     queue,
     queueOptions: { exclusive: true },
     exchanges: [{ exchange, type: 'topic', autoDelete: true }],
     queueBindings: [{ exchange, routingKey }],
   },
   async (msg) => {
     await doSomeAsyncWork()
  });

And I have a publisher:

const publisher = client.createPublisher({
  confirm: true,
  exchanges: [{ exchange, type: 'topic', autoDelete: true }]
})

await publisher.publish({ exchange, routingKey }, "my-message")

The publish function resolves directly after sending the message to the queue, instead of waiting on the ack. Is there anything I'm missing? I've also tried to just aquire a channel and do await channel.confirmSelect() but it resulted in the same behavior.

cody-greene commented 1 year ago

It looks like you're publishing before the queue exists. So likely the message is published, and acknowledged, but it simply disappears since there's no where to place it. You can check this with the mandatory flag and the basic.return event when publishing.

publisher.on('basic.return', msg => console.log('unroutable:', msg))
await publisher.publish({mandatory: true, ...}, "my-message")
// unroutable {
//   replyCode: 312,
//   replyText: 'NO_ROUTE',
//   exchange: 'my-exchange',
//   routingKey: 'my-topic',
//   contentType: 'text/plain',
//   deliveryMode: 1,
//   timestamp: 1679587185,
//   durable: false,
//   body: 'my-message'
// }

You can wait for the consumer's ready event before publishing:

// pick one
consumer.on('ready', ...)
consumer.once('ready', ...)
await new Promise(resolve => consumer.once('ready', resolve))

This is less of an issue when using durable (and exclusive=false) queues.

Gielert commented 1 year ago

Hmm. I'm not getting any unroutable errors when subscribing to basic.return. Also, I've added the mandatory flag and it still has the same behavior. My consumer still handles the messages that are sent, but the publish just doesn't await an ack. I'm pretty sure the queue exists before I'm actually publishing.

Gielert commented 1 year ago

Just some additional info:

await publisher.publish({ mandatory: true, exchange, routingKey }, "my-message")                                                                                
publisher.on('basic.return', msg => console.log('unroutable:', msg))
console.log('post publish')
const consumer = client.createConsumer(
   {
     queue,
     queueOptions: { exclusive: true },
     exchanges: [{ exchange, type: 'topic', autoDelete: true }],
     queueBindings: [{ exchange, routingKey }],
   },
   async (msg) => {
     await doSomeAsyncWork()
     console.log('ack')
  });
  publisher.on('ready', () => console.log('ready'))

This will result in the following output in my console:

ready
post publish
ack
cody-greene commented 1 year ago

Ahh there is a misunderstanding here. Publisher acknowledgements are independent of consumer acknowledgements.

From https://www.rabbitmq.com/confirms.html#when-publishes-are-confirmed (read the whole article, but here's the salient point)

For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).

For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For quorum queues, this means that a quorum replicas have accepted and confirmed the message to the elected leader.

A consumer acknowledgement, on the other hand, means the message has been processed and can be removed from the queue.

cody-greene commented 1 year ago

If you want request-response behavior, then take a look at the RPC api rabbit.createRPCClient() which allows you to wait for a consumer to "respond" to your "request"

Gielert commented 1 year ago

Aaaah, it seems I misunderstood indeed. Thanks for the fast response! I'll check out the RPC api!