streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

memory growing with a slow consumer ? #489

Open thothothotho opened 3 years ago

thothothotho commented 3 years ago

About that: https://github.com/streadway/amqp/blob/e6b33f460591b0acb2f13b04ef9cf493720ffe17/consumers.go#L72

I suppose the buffering was added there to consume the server flow as fast as possible. But if the consumer is slow and autoAck is true, it means the memory can grow without limit. No more back pressure.

I'm a bit a noob with rabbit, maybe this is the classical usage for a driver. From the docs, I thought ack was more for data safety (which I don't care for my usage). I was still expecting some correct flow control with some backpressure, with autoAck == true. So I would have expected from the driver to stop reading the tcp stream / rabbit channel, when the consumer does not read the go channel.

What is the status on this ? If the current unlimited buffering is what you want, then that should be clearly documented.

Thanks for your help.

binlaniua commented 2 years ago

same to me

we use rabbitmq receive canal message

client consume one by one, so memory grow so fast, killed by system, alway restart

binlaniua commented 2 years ago

see (https://github.com/binlaniua/amqp091-go/blob/1036b2488284b330aee639ad1deae1a0ead578eb/consumers.go#L54)

fixed it, memory stable, not grow

lukebakken commented 2 years ago

I'm a bit a noob with rabbit, maybe this is the classical usage for a driver. From the docs, I thought ack was more for data safety (which I don't care for my usage). I was still expecting some correct flow control with some backpressure, with autoAck == true. So I would have expected from the driver to stop reading the tcp stream / rabbit channel, when the consumer does not read the go channel.

RabbitMQ does not use TCP flow control as a source of backpressure (at least, that I can tell after rummaging through the source for 15 minutes). It uses the ack mechanism for this. So, that means with autoAck, your consumer had better keep up. As I'm sure you know by using autoAck you give up data safety guarantees as well.

A better solution is to use a combination of prefetch and batch acks.