streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

[Help] Publish don't trigger any error #344

Closed bakayolo closed 6 years ago

bakayolo commented 6 years ago
func sendRBMQMessage(workflow []byte) error {
    err := ch.Publish(
        rmbqExchange,
        rmbqRoutingKey,
        true,
        false,
        amqp.Publishing {
            DeliveryMode: amqp.Persistent,
            Timestamp:    time.Now(),
            ContentType:  "application/json",
            Body:         message,
        })
    return err
}

If queue does not exist, err is nil. I assume that if any error appears during the publish, err will still be nil. How can I specify an acknowledgement during the publish? I read somewhere that I could use some kind of listeners but I don't really understand how it works.

Thanks

bombsimon commented 6 years ago

The documentation states that:

... Since publishings are asynchronous, any undeliverable message will get returned by the server. Add a listener with Channel.NotifyReturn to handle any undeliverable message when calling publish with either the mandatory or immediate parameters as true. ... This can return an error when the channel, connection or socket is closed. The error or lack of an error does not indicate whether the server has received this publishing. ...

So there are "listeners" like NotifyReturn and NotifyPublish which is channels you can read from, you can read more about them in the documentation as well.

Regarding how they work, you simply pass these functions a channel and whenever a notification occurs you can read from that channel.

Example

// ch is the same channel you use in your code to publish.
returnErrChan := ch.NotifyReturn(make(chan amqp.Return))

go func() {
    notification, ok := <- returnErrChan
    if !ok {
        // Channel was closed.
    }

    log.Printf("A message got returned: %s %s\n", notification.RoutingKey, notification.ReplyText)
}()

You might want to do that in a loop and take proper action based on the result and if the channel is closed etcetera.

Hope this helps!

bakayolo commented 6 years ago

Hi @bombsimon Thanks for your answer and example. It helps a lot.

About your example, does it mean the go func will be called each time a message is returned? Or only once? Why do I need a loop here?

bombsimon commented 6 years ago

The func() is not some kind of callback, it's just an anonymous function spawned in a separate go routine. The way it works is with the channel you create. I'm not sure how familiar you are with go routines and channels so sorry if this is too much details.

The channel created will receive the notification each time any request published will be returned so it might be more than once. This means that if you wan't to keep tracking the result of your publishings for more than one request you need to read from the channel as long as your application is running.

Since amqp.Return which is sent on the channel contains the CorrelationId you could use that field to identify which of your requests didn't get delivered. That means that you need to keep track of what request was sent with what ID. The correlation id is set on the amqp.Publishing just like your body and content type.

A more proper way and why I suggest a loop would look like this:

// Spawn a go routine taking care of all requests returned
go func() {
    for {
        notification, ok := <- returnErrChan
        if !ok {
            // Channel closed, probably also the channel or connection.
            return
        }

        messageFailing, ok := myMessageMap[notification.CorrelationId]
        if !ok {
            fmt.Println("No message mapped to ID: ", notification.CorrelationId)
        }

        // Do what you need with messageFailing.
    }
}()
bakayolo commented 6 years ago

Thanks a lot man. Indeed I was not very familiar with routines and channels (just started with go). I am more now. :-) Working just fine. 😬 Thanks for your help, really appreciate! 👍