streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

Sent Nack for delivery but always dispatched Ack #349

Closed dehypnosis closed 6 years ago

dehypnosis commented 6 years ago

In confirmation mode, In single process with two gorutine for each connections for subscriber/publisher channel each. When I send ack for a delivery, nack is dispatched finally..

With below code:

# amqp/channel.go:275
...
// Eventually called via the state machine from the connection's reader
// goroutine, so assumes serialized access.
func (ch *Channel) dispatch(msg message) {

        // print added
    fmt.Printf("Dispatch %T, %+v", msg, msg)
    switch m := msg.(type) {
    case *channelClose:
        // lock before sending connection.close-ok
        // to avoid unexpected interleaving with basic.publish frames if
        // publishing is happening concurrently
        ch.m.Lock()
        ch.send(&channelCloseOk{})
        ch.m.Unlock()
        ch.connection.closeChannel(ch, newError(m.ReplyCode, m.ReplyText))
...

# amqp/channel.go:1549
...
func (ch *Channel) Ack(tag uint64, multiple bool) error {
    ch.m.Lock()
    defer ch.m.Unlock()

        // print added
    fmt.Printf("Send ack!\n")
    return ch.send(&basicAck{
        DeliveryTag: tag,
        Multiple:    multiple,
    })
}

/*
Nack negatively acknowledges a delivery by its delivery tag.  Prefer this
method to notify the server that you were not able to process this delivery and
it must be redelivered or dropped.

See also Delivery.Nack
*/
func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error {
    ch.m.Lock()
    defer ch.m.Unlock()

        // print added
    fmt.Printf("Send nack!\n")
    return ch.send(&basicNack{
        DeliveryTag: tag,
        Multiple:    multiple,
        Requeue:     requeue,
    })
}

Here is a strange logs with my testcode.

2018/09/22 15:44:51 [0] Publisher started...
Dispatch *amqp.channelOpenOk, &{reserved1:}Dispatch *amqp.exchangeDeclareOk, &{}Dispatch *amqp.confirmSelectOk, &{}

2018/09/22 15:44:51 [0] Subscriber started...
Dispatch *amqp.channelOpenOk, &{reserved1:}Dispatch *amqp.exchangeDeclareOk, &{}Dispatch *amqp.confirmSelectOk, &{}Dispatch *amqp.queueDeclareOk, &{Queue:consumer 0 MessageCount:0 ConsumerCount:0}Dispatch *amqp.queueBindOk, &{}Dispatch *amqp.basicConsumeOk, &{ConsumerTag:ctag-/var/folders/4h/8m5ffs0x3dv_8q_c517bcltm0000gn/T/go-build384005370/b001/exe/stress-1}

2018/09/22 15:44:51 Subscribed: consumer 0
Dispatch *amqp.basicDeliver, &{ConsumerTag:ctag-/var/folders/4h/8m5ffs0x3dv_8q_c517bcltm0000gn/T/go-build384005370/b001/exe/stress-1 DeliveryTag:1 Redelivered:false Exchange:grpc-proxy RoutingKey:test.hello.world2 Properties:{ContentType: ContentEncoding: Headers:map[X-Retry:{Scale:0 Value:0}] DeliveryMode:2 Priority:0 CorrelationId: ReplyTo: Expiration: MessageId: Timestamp:0001-01-01 00:00:00 +0000 UTC Type: UserId: AppId: reserved1:} Body:[116 101 115 116 32 98 111 100 121]}

2018/09/22 15:44:52 [consumer 0] Message received and confirmed: 1
Send nack!
Dispatch *amqp.basicAck, &{DeliveryTag:1 Multiple:false}

2018/09/22 15:44:52 [publisher 1] Message resolved: test.hello.world2 (1)
Dispatch *amqp.basicDeliver, &{ConsumerTag:ctag-/var/folders/4h/8m5ffs0x3dv_8q_c517bcltm0000gn/T/go-build384005370/b001/exe/stress-1 DeliveryTag:2 Redelivered:false Exchange:grpc-proxy RoutingKey:test.hello.world2 Properties:{ContentType: ContentEncoding: Headers:map[X-Retry:{Scale:0 Value:0}] DeliveryMode:2 Priority:0 CorrelationId: ReplyTo: Expiration: MessageId: Timestamp:0001-01-01 00:00:00 +0000 UTC Type: UserId: AppId: reserved1:} Body:[116 101 115 116 32 98 111 100 121]}

2018/09/22 15:44:53 [consumer 0] Message received and confirmed: 2
Send nack!
Dispatch *amqp.basicAck, &{DeliveryTag:2 Multiple:false}

2018/09/22 15:44:53 [publisher 1] Message resolved: test.hello.world2 (2)
^Csignal: interrupt

I dont know why below logs appears. I tried to send nack via channel with (multiple: false, requeue: false);. it seems there may be a bug in un/serialization or im doing wrong something.

Send nack!
Dispatch *amqp.basicAck, &{DeliveryTag:2 Multiple:false}

Can you give me any hint?

dehypnosis commented 6 years ago

I misunderstood amqp itself. For someone here who misunderstood like me:

ref: http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/

The basic rules are as follows: an un-routable mandatory or immediate message is confirmed right after the basic.return; otherwise, a transient message is confirmed the moment it is enqueued; and, a persistent message is confirmed when it is persisted to disk or when it is consumed on every queue.

And can refer RPC pattern for synchronous publisher->broker->consumer->broker->publisher round trip.