cody-greene / node-rabbitmq-client

RabbitMQ (0-9-1) client library with auto-reconnect & zero dependencies
MIT License
130 stars 9 forks source link

Publisher flow control #59

Open cody-greene opened 1 month ago

cody-greene commented 1 month ago

Let's say I want to maximize publisher throughput but still have publisher confirms enabled. Waiting for a confirm after each publish is safe, but very slow. One approach is to publish in batches, say 100 messages, then wait for confirmations. The server can return a single BasicAck(multiple=true) for a whole batch.

Another approach is pipelining, where confirmations are handled asyncronously and the number of unconfirmed messages is kept under some configurable limit. Say, for example, maxUnconfirmed=5. Then I can push five messages through the TCP socket. I need to recieve at least one confirmation before I can send the sixth message.

Possible API changes to consider:

  1. If pub = createPublisher({ confirm: true, maxUnconfirmed: 5 }) then pub.send() will block while maxUnconfirmed is reached. The promise will still only resolve when a confirm is received. It is possible for this action to timeout before the message is ever sent.
  2. If pub = createPublisher({ confirm: 'pipeline', maxUnconfirmed: 5 }) then pub.send() will block while maxUnconfirmed is reached, but otherwise returns without waiting for a confirmation. Rejections will surface as error events on the publisher object, and should include a reference to the envelope/payload. If retries are enabled, then rejected messages should be retried asyncronously. 2a. Or use pub.send({pipeline: true}, ...) to allow this behavior at a more granular level. 2b. Or use pub.sendPipelined(...)

Example:

const pub = rabbit.createPublisher({
  confirm: true,
  maxUnconfirmed: 100
})

for (let i = 0; i < 100_000; ++i) {
  // automatic flow control; up to 100 messages in flight
  await pub.send({routingKey: 'foo', pipeline: true}, {id: i})
}