streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

What should I do if publishing confirm failed with the 'deliveryTag' #507

Closed Icarus9913 closed 2 years ago

Icarus9913 commented 3 years ago

Sorry for asking this simple question. If I publish one msg and get the confirmations (confirmations.ack==false). In this situation, how can I re-publish this msg? Should I use channel.Nack()? And how can I imitate the return confirmation ack is false?

  func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if nil != err {
        panic(err)
    }

    ch, err := conn.Channel()
    if nil != err {
        panic(err)
    }

    queue, err := ch.QueueDeclare("hey", true, false, false, false, nil)
    if nil != err {
        panic(err)
    }

    if err = ch.Confirm(false); nil != err {
        panic(err)
    }

    confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))

    body := "hello"
    err = ch.Publish("", queue.Name, false, false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        },
    )
    if nil != err {
        panic("sending failed: " + err.Error())
    }

    if confirmed := <-confirms; confirmed.Ack {
        fmt.Println("good")
    } else {
        fmt.Println("what should I do?")
        tag := confirmed.DeliveryTag
        ch.Nack(tag, false, true) //Is that right to use this method? But I can't imitate this situation. I want it to re-publish
    }
  }
Icarus9913 commented 3 years ago

I mean that I can not locate the exact msg by the confirmed.DeliveryTag. I wanna try to re-publish this msg if the confirmed.Ack is false.

ArtyomGabeev commented 3 years ago

@Icarus9913 you need to preserve an inflight messages by our own, example using map[uint64]amqp.Publishing.

samyonr commented 3 years ago

As mentions above, you need to store your messages coupled with the delivery number. Once confirmed, remove it from the map, if it's not confirmed, re-publish it again (probably up to certain amount of attempts). You can manually calculate the delivery number, or use the mechanism in #478 that was merged but not released. However, the exact same commit was released in rabbitmq/amqp091-go: https://github.com/rabbitmq/amqp091-go/releases/tag/v1.2.0