streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.85k stars 620 forks source link

Returns are not delivered always before ACKs (but they should) #351

Open gm42 opened 5 years ago

gm42 commented 5 years ago

Quoting RabbitMQ documentation (https://www.rabbitmq.com/confirms.html#when-publishes-are-confirmed):

When Will Published Messages Be Confirmed by the Broker? For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).

For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message.

From the above I deduce that a generic client can detect the existence of a return by the fact that such return is always received before the ACK.

However, when using this client AMQP library there is no guarantee that this order is respected and I am in fact receiving returns after ACKs.

I can produce a testcase but can somebody please confirm whether this is by design or not?

My problem is (apparently) simple: send multiple messages concurrently, verify that each of them has been received and routed (with immediate=false, but durable=true). If the server-side constraint of "returns are ordered before ACKs" is not respected, this becomes impossible.

I am also suggesting here to add an example for consistent (transactional) message delivery with a concurrent pattern (otherwise the usage and capabilities of all the channels involved, with proper cleanup, might be unclear), but we can address that separately in a PR

Edit: I also noticed that returns have no DeliveryTag, which perhaps would be impossible to associate, so I am using MessageId (application-level) to detect the returns in my test.

gm42 commented 5 years ago

This is my (dirty) fix:

diff --git a/channel.go b/channel.go
index dd2552c..f90d823 100644
--- a/channel.go
+++ b/channel.go
@@ -297,7 +297,12 @@ func (ch *Channel) dispatch(msg message) {
        }
        ch.notifyM.RUnlock()
        ch.consumers.cancel(m.ConsumerTag)
-
+       /*
+          # When Will Published Messages Be Confirmed by the Broker?
+          For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue
+          (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client
+          before basic.ack. The same is true for negative acknowledgements (basic.nack).
+       */
    case *basicReturn:
        ret := newReturn(*m)
        ch.notifyM.RLock()
@@ -307,6 +312,14 @@ func (ch *Channel) dispatch(msg message) {
        ch.notifyM.RUnlock()

    case *basicAck:
+       // send the ACK also on the return channel, so that the sequence there is always preserved
+       ret := newAckReturn(true, m.DeliveryTag, m.Multiple, false)
+       ch.notifyM.RLock()
+       for _, c := range ch.returns {
+           c <- *ret
+       }
+       ch.notifyM.RUnlock()
+
        if ch.confirming {
            if m.Multiple {
                ch.confirms.Multiple(Confirmation{m.DeliveryTag, true})
@@ -316,6 +329,14 @@ func (ch *Channel) dispatch(msg message) {
        }

    case *basicNack:
+       // send the NACK also on the return channel, so that the sequence there is always preserved
+       ret := newAckReturn(false, m.DeliveryTag, m.Multiple, m.Requeue)
+       ch.notifyM.RLock()
+       for _, c := range ch.returns {
+           c <- *ret
+       }
+       ch.notifyM.RUnlock()
+
        if ch.confirming {
            if m.Multiple {
                ch.confirms.Multiple(Confirmation{m.DeliveryTag, false})
diff --git a/return.go b/return.go
index 10dcedb..feefad8 100644
--- a/return.go
+++ b/return.go
@@ -34,6 +34,23 @@ type Return struct {
    AppId           string    // application use - creating application

    Body []byte
+
+   // payload for ACK/NACK
+   ACKType     bool
+   ACK         bool
+   Multiple    bool
+   Requeue     bool
+   DeliveryTag uint64
+}
+
+func newAckReturn(isACK bool, deliveryTag uint64, multiple, requeue bool) *Return {
+   return &Return{
+       ACKType:     true,
+       ACK:         isACK,
+       DeliveryTag: deliveryTag,
+       Multiple:    multiple,
+       Requeue:     requeue,
+   }
 }

 func newReturn(msg basicReturn) *Return {

It may be dirty but at least makes return/ACKs be delivered in the correct order.

I think a proper solution should rework confirmations in this sense (order preservation with RETURNs).

michaelklishin commented 5 years ago

Thanks, looks like you are onto something. I don't like how returns and acks are intertwined in your example but cannot think of a better solution without reviewing how return handlers are implemented.

gm42 commented 5 years ago

I don't like how returns and acks are intertwined in your example

Me neither! I was hoping that by showing where the pain is, you could come up with something more elegant? Although I don't see how it would be possible without some refactoring.