wagslane / go-rabbitmq

A wrapper of streadway/amqp that provides reconnection logic and sane defaults
https://blog.boot.dev/golang/connecting-to-rabbitmq-in-golang-easy/
MIT License
797 stars 129 forks source link

doubt about startNotifyFlowHandler #13

Closed pwli0755 closed 3 years ago

pwli0755 commented 3 years ago
func (publisher *Publisher) startNotifyFlowHandler() {
    for ok := range publisher.notifyFlowChan {
        publisher.disablePublishDueToFlowMux.Lock()
        publisher.logger.Printf("pausing publishing due to flow request from server")
        if ok {
            publisher.disablePublishDueToFlow = false
        } else {
            publisher.disablePublishDueToFlow = true
        }
        publisher.disablePublishDueToFlowMux.Unlock()
        publisher.logger.Printf("resuming publishing due to flow request from server")
    }
}

but I see the following comment in amqp (channel.go#L49):

    // Listeners for active=true flow control.  When true is sent to a listener,
    // publishing should pause until false is sent to listeners.
    flows []chan bool

Maybe startNotifyFlowHandler should behave like this?

func (publisher *Publisher) startNotifyFlowHandler() {
    // Listeners for active=true flow control.  When true is sent to a listener,
    // publishing should pause until false is sent to listeners.
    for ok := range publisher.notifyFlowChan {
        publisher.disablePublishDueToFlowMux.Lock()
        if ok {
            publisher.logger.Printf("pausing publishing due to flow request from server")
            publisher.disablePublishDueToFlow = true
        } else {
            publisher.disablePublishDueToFlow = false
            publisher.logger.Printf("resuming publishing due to flow request from server")
        }
        publisher.disablePublishDueToFlowMux.Unlock()
    }
}

pls the me know if I get this wrong

wagslane commented 3 years ago

I think you're right. Would you like to make the PR? If not I'll take care of it later