streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

Memory usage grows if channel is not serviced #434

Closed jpmeijers closed 4 years ago

jpmeijers commented 4 years ago

I have a piece of code that subscribes to RabbitMQ, pushes the data to a go channel, another goroutine listens on the channel for messages and inserts the messages into a database: https://github.com/ttnmapper/mysql-insert-raw/blob/master/main.go#L104

The full architecture of the test system is:

+----------------+     +----------+     +------------+     +----------------+
| Python program | --> | RabbitMQ | --> | Go program | --> | MySQL/Postgres |
+----------------+     +----------+     +------------+     +----------------+

After running for a while the Python program has published 1.5 million messages. At the same time the go program has inserted 1 million messages into the database. The expected behaviour is that the 500'000 messages that has not yet bee inserted is still in the queue. If one however stops the go program and restart it, there is nothing in RabbitMQ's queue. Which means that the go program already read everything from rabbit and cached it internally. My channels are all defined with size 1, so it can't be cached there. It seems like it is cached internally in the amqp library.

To confirm this one can also see that the memory usage of the go program increases as the difference between published messages and database inserted messages increase.

How can one disable this internal caching of messages in the amqp library, so that messages won't get lost when the go program crashes?

lukebakken commented 4 years ago

You are making a serious mistake by using auto-ack in your code. If your code crashes, all messages that have been delivered to it but not yet processed will be lost.

You should use channel prefetch and manual acknowledgement. Prefetch limits the number of un-acked messages sent to your consumer. Manual acks mean that you tell RabbitMQ when you are done processing the message (i.e. inserting records into the database).

These concepts are explained in RabbitMQ's documentation and tutorials.

Finally, questions like this should be asked on the rabbitmq-users mailing list. More people will read it to give assistance, and others can learn from it.

jpmeijers commented 4 years ago

These concepts are explained in RabbitMQ's documentation and tutorials.

I tried reading them, but they're very difficult to understand. What I know about AMQP/RabbitMQ is what I've learned from this library's examples.

Finally, questions like this should be asked on the rabbitmq-users mailing list.

I disagree. Having to subscribe to a mailing list for a single time question doesn't seem right. I think a better place is asking the question on Stackoverflow.

Thanks a lot for your clear answer!

lukebakken commented 4 years ago

I tried reading them, but they're very difficult to understand.

If you have specific suggestions about what can be improved, we're always willing to improve our documentation. Please let us (the RabbitMQ engineering team) know what difficulty you had by either opening an issue (https://github.com/rabbitmq/rabbitmq-website/issues) or posting a message to the mailing list.

The RabbitMQ engineering team monitors the mailing list first and foremost. We check stack overflow once in a while, and the questions and discussion there are poorer quality when compared to rabbitmq-users. Stack overflow is not suited for back-and-forth discussion, either.

If you want correct answers within a reasonable time frame, the mailing list is the best place.