cody-greene / node-rabbitmq-client

RabbitMQ (0-9-1) client library with auto-reconnect & zero dependencies
MIT License
130 stars 9 forks source link

Consumer deferred acknowledgements #60

Open livecloud-labs opened 4 weeks ago

livecloud-labs commented 4 weeks ago

Hey! Great library. Been using this for a while now in production and really pleased with it!

I'm in the process of streamlining a service that is responsible for running DB inserts. Currently, the service is using BullMQ with workers processing messages individually causing some unnecessary strain on the DB instance.

Ideally, I'd like to move this process to RMQ. The idea is to buffer incoming messages, then flush when either a max batch size or timeout is hit. The current consumer acknowledgement functionality provides a way to handle ack'ing/re-queueing/dropping messages but there doesn't appear to be a way to defer acknowledgment from the consumer, unless i'm missing something?

Is this something you might be interested in implementing? From first glance it might look something like this...

private async _execHandler(msg: AsyncMessage) {

  ...

  if (!this._props.noAck) {
    if (retval === ConsumerStatus.DROP) {
      ch.basicNack({deliveryTag: msg.deliveryTag, requeue: false})
      ++this.stats.dropped
    } else if (retval === ConsumerStatus.REQUEUE) {
      ch.basicNack({deliveryTag: msg.deliveryTag, requeue: true})
      ++this.stats.requeued
    } else if (retval === ConsumerStatus.DEFER) {
      // Defer acknowledgment
      return;
    }
    else {
      ch.basicAck({deliveryTag: msg.deliveryTag})
      ++this.stats.acknowledged
    }
  }
}

// Then, elsewhere...

consumer.ack({...})

Any thoughts?

cody-greene commented 4 weeks ago

I can see the need for this. Manual acks create a bit of a safety issue though. I'd like to provide a API specifically for batched consumers, otherwise people will re-implement the same thing on top of this manual ack capability, with mixed results.

Perhaps a BatchConsumer class, with options for batchSize (the maximum number of messages to accumulate before handing off to the callback) and batchDelay (the maximum period to accumulate messages).

livecloud-labs commented 4 weeks ago

Right, good call. A new API would make way more sense here.

It's something I need to implement anyway, so I'm happy to pick this up and have a crack at doing it properly rather than hacking something together specifically for my use-case. Might take a while to find the time to get to grips with the codebase but I've been using the lib for a while so would be good to contribute back. LMK

cody-greene commented 4 weeks ago

I will be rather busy for the next two weeks so absolutely take a crack at it. I suggest that you focus on designing the outward interface/API first. Try writing some example code using the new interface, see if anything is missing from how you would actually use it.

Like partial failures: can we nack some messages in a batch? And concurrency: should we process one batch at a time? How are errors surfaced?

Implementation wise, we will want to send a single BasicAck(multiple=true) for each batch, assuming it wasn't a partial failure. And we'll have to make sure the lifecycle of batchDelay timer is managed properly, cleaned up, etc.

I'm not sure yet, if we can subclass Consumer or if deeper changes are needed to Channel and so on.

-------- Original Message -------- On 10/28/24 11:01 AM, livecloud-labs wrote:

Right, good call. A new API would make way more sense here.

It's something I need to implement anyway, so I'm happy to pick this up and have a crack at doing it properly rather than hacking something together specifically for my use-case. Might take a while to find the time to get to grips with the codebase but I've been using the lib for a while so would be good to contribute back. LMK

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you commented.Message ID: @.***>