Closed timofurrer closed 5 years ago
Yes this is currently our main design problem. Using callback to process messages from RabbitMQ was not a good choice. We're looking into it to find a better API.
Okay, thanks! I could solve this issue for know with opening multiple connection and having multiple channels.
Hi, as this isn't moving forward and I'm interested in a solution (for now I'm solving this at application level), I want to propose an API for message consumption. I don't know the code base of the library so I don't know if it is difficult to implement but I think that this API is cleaner and better.
The first thing is the external API, it would be nice to have an async iterator for consuming messages, something like:
consumer = await channel.basic_consume(
queue_name=queue_name,
no_ack=True,
consumer_tag=consumer_tag
)
async for message in consumer:
# do whatever with the message
pass
# message can be a Message object that contains as attributes the tuple of
# "channel, body, envelope, properties" that now is passed to
# the callback or directly return that as a tuple, so it will be
async for channel, body, envelope, properties in consumer:
# do whatever with the message
pass
The consumer returned by channel.basic_consume
should be a Consumer object implementing the async iterator protocol that polls from a asyncio.Queue
that have the messages inside, that queue should be populated by another asyncio.Task
that would be the method that consumes from the RabbitMQ and dispatch the message to the correct asyncio.Queue
.
At the end you have a asyncio.Task
per channel getting messages and dispatching them to queues for consumers so the consumer Task of the library is separated from the Tasks of the user of the library.
I think that most of the changes goes in channel.basic_consume
and channel.basic_deliver
also the channel.consumer_callbacks
attribute should be changed to channel.consumer_queues
I think that if you guys at Polyconseil agree, and is not too difficult to implement, I can implement that.
Going from the RPC example ... Let's assume I want to call my caller again in the
on_request
callback and wait for the response. How can I achieve this without a deadlock?