streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

No message received on Flow channel #512

Open Bedotech opened 3 years ago

Bedotech commented 3 years ago

Hi, thanks for your great work on AMQP implementation! I having a problem on detect when channel enter in flow mode using RabbitMQ 3.8.15, after the channel creation i start a goroutine that check if some messages are sent from the server:

// Flow is sent to publisher channel when the Broker can't accept new messages.
notifyFlow := channel.NotifyFlow(make(chan bool))

go func() {
for {
    select {
    case flow, ok := <-notifyFlow:
        {
            if !ok {
                return
            }

            if flow {
                // On flow true resume all publishing
                publisher.lock.Unlock()
            } else {
                // On flow false stop all publishing
                publisher.lock.Lock()
            }
        }
    case <-publisher.ctx.Done():
        {
            return
        }
    }
}
}()

The channel isn't in confirm or transaction mode. On the same channel i start publishing in loop 1 millon messages, the connection on broker side are in flow mode:

bash-5.1# rabbitmqctl list_connections
Listing connections ...
user    peer_host   peer_port   state
guest   172.20.0.1  62810   flow
bash-5.1# rabbitmqctl list_channels
Listing channels ...
pid user    consumer_count  messages_unacknowledged
<rabbit@b929e6a54eed.1627889825.26180.20>   guest   0   0
bash-5.1#

but no notification is returned from the channel passed to NotifyFlow, any ideas?

Bedotech commented 3 years ago

Ok, maybe i get this, modern version of RabbitMQ use TCP back pressure, there is any way to detect di behaviour?