streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

When I consume from a queue, the queue goes up #530

Closed rlewkowicz closed 2 years ago

rlewkowicz commented 2 years ago

I have tried a variety of settings. Seeing what would stick so some of this may be superfluous

    args := make(amqp.Table)
    args["x-message-ttl"] = int32(2000)
    args["x-max-length"] = int32(100)

    _, err = channelRabbitMQ.QueueDeclare(
        "udp", // queue name
        false, // durable
        false, // auto delete
        false, // exclusive
        true,  // no wait
        args,  // arguments
    )

This works. I load messages into a queue and it never gets too big.

However, the queue starts to grow rapidly once I start consuming and I can't for the life of me understand why. In advance, I don't want the queue message to ever be removed, I simply want it to be "consumed" but have it remain until it's expiry (I even have message expiry set).


    args := make(amqp.Table)
    args["x-message-ttl"] = int32(2000)
    args["x-max-length"] = int32(100)

    messages, err := channelRabbitMQ.Consume(
        "udp", // queue name
        "",    // consumer
        false, // auto-ack
        false, // exclusive
        false, // no local
        true,  // no wait
        args,  // arguments
    )
    if err != nil {
        log.Println(err)
    }

First, I cannot find the darn x arguments for rabbit consume. I don't even think these args here do anything.

Anywho, I've tried just about every iteration of these positional values and it either consumes the queue to 0 or to the contrary the queue will start building quickly above the set values.

Now if I nack it with a requeue it does work, but that jacks up cpu for rabbit and my golang binary

        for message := range messages {
            _, err := conn.Write(message.Body)
            message.Nack(false, true)
            if err != nil {
                fmt.Printf("Couldn't send response %v", err)
            }
        }

Now I understand reading the docs it talks about ttl and how if a message does not reach the head of the queue it can't expire and some other caveats, but I don't see how they apply. If without intervention, the queue is stable, then why would consuming messages cause it to rise?

The TLDR is could you give me pointers on how to transparently consume the queue in a way that its stable behavior remains as when it's not being consumed, alternatively what is the most effective way to be sending nacks?

rlewkowicz commented 2 years ago

Incase anyone is ever here with the same question, here is my possible unideal answer. This at least helped my cpu issues.

In my consume, I simply have a counter. After 400 consumes it will nack the 401st message with (I can't find the docs, I just saw it earlier so I remember what it is) true,true which is essentially saying nack all the messages before this one, and requeue. I still don't fully understand WHY this is happening, but I'm pretty sure it goes to re-queue and then respects the expiration and dev nulls them.

rlewkowicz commented 2 years ago

Overall, rabbit kinda sucks is how I feel and I'm trying kafka.

lukebakken commented 2 years ago

I'm leaving this here for other people who may come across this GH issue. This repository is not officially maintained by the RabbitMQ core team. Instead, use this library:

https://github.com/rabbitmq/amqp091-go

In addition, opening GH issues may not get the RabbitMQ team's attention. The mailing list may be a better place to ask:

https://groups.google.com/g/rabbitmq-users

Finally, to address the issue here:

In advance, I don't want the queue message to ever be removed, I simply want it to be "consumed" but have it remain until it's expiry

RabbitMQ streams would be the correct solution for this use-case - https://www.rabbitmq.com/streams.html

https://github.com/rabbitmq/rabbitmq-stream-go-client