streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

Prefetch not assigned to first channel #502

Open osok opened 3 years ago

osok commented 3 years ago

I have an application that consumes messages from a channel. The execution can be lengthy like a few minutes to run. When the application picks up a message it builds one of four configurations. In some cases it can be short, so I want it to pick up the next message.

Server --> (writes four messages) ----> I have four processes consuming [Builder]

The builder is the application. I have it set to prefetch just one message from the channel.

    // Connect to the rabbitMQ instance
    connection, err := amqp.Dial(url)
    defer connection.Close()

    if err != nil {
        util.FailOnError(err, "could not establish connection with RabbitMQ in main")
        return
    }

    channel, err := connection.Channel()
    if err != nil {
        util.FailOnError(err, "could not open first RabbitMQ channel in main")
        return
    }

    // Only pick up one message at a time.  Since some of the workers will take a long time to compile all the
    // executables, this helps to allow other workers to pick up work.
    channel.Qos(1,0,true)
    // We create an exchange that will bind to the queue to send and receive messages
    err = channel.ExchangeDeclare(builder2.BUILD_EXCHANGE, "topic", true, false, false, false, nil)

    if err != nil {
        util.FailOnError(err, "error declaring exchange in main")
        return
    }

    // We bind the queue to the exchange to send and receive data from the queue
    err = channel.QueueBind(builder2.BUILD_QUEUE, "#", builder2.BUILD_EXCHANGE, false, nil)
    if err != nil {
        // we need to reopen the channel
        channel, err = connection.Channel()
        if err != nil {
            util.FailOnError(err, "could not open RabbitMQ channel in main")
            return
        }
        err = declareQueue(channel)
        if err != nil {
            util.FailOnError(err, "error binding to the queue in main")
            return
        }
    }

    // We consume data from the queue named Test using the channel we created in go.
    msgs, err := channel.Consume(builder2.BUILD_QUEUE, "", false, false, false, false, nil)

So when I run the Builder four times, I get get four channels. The prefetch is properly set for channel 2,3 &4, but channel 1 is not set.

So when the server fires off the first batch, and then a second, I see five messages in the first channel and one in each of the other three.

go version go1.14.2 linux/amd64 I pulled amqp as late as today 4/10/2021

2021-04-10_16-47-15