Open lumasepa opened 7 years ago
Hello @lumasepa
Thank you for this huge contribution. Using an asyncio.Queue was the first design of aioamqp, and I removed it.
The first things is that when on high load, RabbitMQ will fill your entire ram of messages, how can we correctly avoid it ?
I know the problem, you are rigth, what about using a limited size Queue (the size can be a parameter of the basic_consume
method) and in case of filling it we can use the AMQP basic.reject
with requeue set to true. The standar describes basic.reject
as the next and I think that it can solve this problem.
What you think about it, @dzen ?
Other option is the limited size Queue and raising an exception in case of fulling it. That allows the user to choose what to do with this situation, for example stop consuming or stop the flow of the channel.
Hello @lumasepa, I'm sorry for the late reply. @dzen and I talked about your PR and we have a few comments:
fetch_message()
and get_message()
calls. Why not wait and return the message directly? Here's what that would look like: for channel, body, envelope, properties in (yield from consumer.fetch_message()):
print(body)
Better yet, maybe it's possible to yield from
the consumer object directly. That would be even more user friendly:
for channel, body, envelope, properties in (yield from consumer):
print(body)
basic_consume()
method, that'd be great. Though if it makes things way too complex, we could forget about it as aioamqp is still in the pre-1.0 phase.Hello, I'm going to answer your points in order
message = yield from consumer.fetch_message()
for channel, body, envelope, properties in message:
print(body)
as you can see the asyncronous operation is done only once (not each time that the for iterates) so is not the same behaviour that the while
, fetch_message
and get_message
API have. For this pattern was created the __aiter__
and __anext__
protocol in conjuntion with the async for
in python 3.5.
async
and await
in the Consumer
classqos_prefetch
is the number of messages that you can get from the broker without acknowledge, so if I put 10 as qos_prefetch
parameter the broker can send to the consumer 10 messages and when the consumer have 10 messages without acknowledge the broker stops the sending of messages to the consumer, when the consumer ack one of the messages the broker can send one more message to the consumer. So if the basic.consume
is declared with the no-ack
flag this is not aplicable. And if we limit the queue to the prefetch limit it can happend that the consume is no-ack
and we get more messages than the limit that the queue supports, and thats a problem. But i'm with you that we should restrict the size of the Queue the thing is what to do when the queue is full. Maybe i'm missing something in this point if is that the case can you explain it better.basic_consume()
but if we maintain the callback API we are not fixing the problem completly, if you really want to maintain the callback API we can create a new method create_consumer()
that have the new API and returns a Consumer
object.hello what is the status of this PR ?
Hello there! I am working on https://github.com/aio-libs/aioamqp_consumer as separate project, please take a look on it!
Maybe we can cooperate on more feature rich project together!
First implementation of a new consumer API, as I propose in #87
I would like a review from polyconseil and some feedback, I think that this API can be a good one.