wagslane / go-rabbitmq

A wrapper of streadway/amqp that provides reconnection logic and sane defaults
https://blog.boot.dev/golang/connecting-to-rabbitmq-in-golang-easy/
MIT License
772 stars 126 forks source link

Add Action.NoAction to disable acknowledgement #57

Closed ghost closed 2 years ago

ghost commented 2 years ago

With this PR I am expanding the Action set with the NoAction value.

Acknowledgements are either set to be autoack or are performed after each message processed. Returning from a handler Action.NoAction a user is able to issue an acknowledgement some later time manually by calling Delivery.Ack, Delivery.Nack or Delivery.Reject methods. This enables to implement batch processing where the Delivery.Ack and the negative counterpart is called with multi:true. I have verified that this approach allows to implement batch processing:

func handler(d rabbitmq.Delivery) rabbitmq.Action {
    batch = append(batch, d)
    if len(batch) == 5 {
        err := batchProcessor.Process(batch)
        if err != nil {
            d.Nack(true, false)
        } else {
            d.Ack(true)
        }
        batch = nil
    }

    return rabbitmq.NoAction
}
lgtti commented 2 years ago

If i can give you an advice before writing this feature, your explained solution laks of an important feature: how can i commit only a set of messages of the batch? For example, i could process a batch of 5 elements where only 3 of them are handled with success. So, i need to acknownedge only 3 and not the entire batch.

func handler(d rabbitmq.Delivery) rabbitmq.Action {
    batch = append(batch, d)
    if len(batch) == 5 {
        processResult := batchProcessor.Process(batch)
                for _, successDelivery := range processResult.Success {
                     successDelivery.Ack()
                }
                for _, failedDelivery := range processResult.Failed {
                     failedDelivery.Nack()
                }
        batch = nil
    }

    return rabbitmq.NoAction
}

In addition, what is the behavior when no messages are incoming and the batch size is not full? Is there a timeout?

wagslane commented 2 years ago

I've actually implemented batch processing with this method. Basically, you just have the entire batch of consumed messages block within their handler. Once the batch is done, you call the appropriate ACK or NACK. I think for anything more advanced users should just use the lower-level AMQP library