mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.22k stars 188 forks source link

Blocking get aio_pika #131

Open josephbiko opened 6 years ago

josephbiko commented 6 years ago

I expected

await queue.get() to be blocking in aio_pika, but even when I don't set the timeout parameter I instantly get an error:

aio_pika.exceptions.QueueEmpty

Anyway to get a blocking get in aio_pika?

mosquito commented 6 years ago

When queue is empty an exception aio_pika.exceptions.QueueEmpty raising. That's because AMQP server respond instantly. But when you try to set prefetch_count=1 on the channel, and you will try to get the message when channel has unacked message this method will wait.

aio-pika is a wrapper around the pika, the pika is a wrapper around the AMQP protocol, this is not changing server-client interaction.

josephbiko commented 6 years ago

Is there also a way to just wait until a message is available? Or do I need to sleep&check until something is available?

mosquito commented 6 years ago

You should call consume for the queue. This example do it easer.

mosquito commented 6 years ago

This example demonstrated the asynchronous message processing

josephbiko commented 6 years ago

Yes, but this is a normal consumer, I just want to retrieve a single message from the queue

mosquito commented 6 years ago

The basic.get method marked as synchronous on the documentation (page 54)

Synchronous: Yes; expected response is from method(s) basic.get­ok, basic.get­empty

You should use this method on very specific cases.

josephbiko commented 6 years ago

Allow me to illustrate, What i do know is: ` while True:

        msg = await q.get(fail=False)

        if msg:
            break

        await asyncio.sleep(1)`
nurettin commented 5 years ago

If we use consume, it will just get messages regardless of whether the consume callback has finished it's work or not. This is undesirable when your consumer can not work concurrently. Ideally, this would do nicely:

consume(callback, wait_until_callback_finishes_before_receiving_any_more_messages=True)

Edit: Nevermind, I thought I tested with prefetch_count=1 but it was probably left as default. It's working fine because aio-pika doesn't receive any new messages until you ack or reject.

mosquito commented 5 years ago

@nurettin just set_qos(prefetch_count=1) and stop consuming before ack