streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.87k stars 620 forks source link

Unable to use channel.Notify* if the same channel is used #467

Closed bentcoder closed 1 year ago

bentcoder commented 4 years ago

Hi,

Using same channel for publishing messages prevents us from using channel.NotifyPublish, channel.NotifyReturn and channel.NotifyConfirm methods. As soon as the first a few messages are delivered, we can not receive any more notifications through these methods. Something blocks select and it prints timeout. See example below.

All I need is to just confirm if the message made it to queue or not. Happy to see a different solution.

Thanks


type Messenger struct {
    conn *amqp.Connection
    chnl *amqp.Channel         // channel.Confirm(false) registered for this.
}

func (m Messenger) ProduceWithFreshChannel(id string, msg []byte) error {
    channel, err := m.conn.Channel()
    if err != nil {
        return err
    }
    if err := channel.Confirm(false); err != nil {
        return err
    }

    return produce(channel, id, msg)
}

func (m Messenger) ProduceWithDefaultChannel(id string, msg []byte) error {
    return produce(m.chnl, id, msg)
}

func produce(channel *amqp.Channel, id string, msg []byte) error {
    if err := channel.Publish(
        "exc-name",
        "create",
        true,
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent,
            MessageId:    id,
            ContentType:  "text/plain",
            Body:         msg,
        },
    ); err != nil {
        return err
    }

    select {
    case ntf := <-chn.NotifyPublish(make(chan amqp.Confirmation, 1)):
        if !ntf.Ack {
            return errors.New("failed to deliver message")
        }
    case <-chn.NotifyReturn(make(chan amqp.Return)):
        return errors.New("failed to deliver message")
    case <-time.After(time.Second):
        log.Println("timeout")
    }

    return nil
}
chrisDeFouRire commented 4 years ago

Hi @BentCoder

I may be wrong as I'm still very new with RabbitMQ and this package, but I'm using confirmations on the same channel and it's working, and I think I've found issues with your code:

The way I do it:

I'm doing all of this on a single channel...

    var err error
    published := channel.NotifyPublish(make(chan amqp.Confirmation, 1))
    channel.Confirm(false)
    for {
        msg := amqp.Publishing{
            DeliveryMode: 2,
            Body:         []byte("Hi !"),
        }
        err = channel.Publish("my-exchange", "routing", true, false, msg)
        if err != nil {
            log.Fatalf("Can't publish: %v\n", err)
        } else {
            confirm := <-published
            if confirm.Ack {
                log.Println("published")
            } else {
                // whatever
            }
        }
    }

In my point of view, it looks like you start listening for confirmations after publishing... Also, I think you can't receive publish notifications on another channel, as they're sent to the channel where you publish...

I hope it will help you (even if this issue is old)