python / asyncio

asyncio historical repository
https://docs.python.org/3/library/asyncio.html
1.04k stars 177 forks source link

Add draining feature to asyncio.Queue #415

Closed nhumrich closed 8 years ago

nhumrich commented 8 years ago

This feature adds the ability to drain a Queue. This is useful for cleanup steps, especially in the simple websocket case.

It adds the following things:

Imagine we have the following code:

import asyncio
q = asyncio.Queue()

async def wait_for_it():
        while True:
            t = await q.get()
            q.task_done()

loop = asyncio.get_event_loop()
asyncio.ensure_future(wait_for_it())

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

This actually creates a lot of warnings/errors on shutdown, such as

Task was destroyed but it is pending!
task: <Task pending coro=<wait_for_it() running at test.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>
Exception ignored in: <coroutine object wait_for_it at 0x7f594abb7830>
Traceback (most recent call last):
  File "test.py", line 6, in wait_for_it
  File "/usr/lib/python3.5/asyncio/queues.py", line 170, in get
  File "/usr/lib/python3.5/asyncio/futures.py", line 227, in cancel
  File "/usr/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
  File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
  File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
  File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed

The fix is

import asyncio

q = asyncio.Queue()

async def wait_for_it():
        while True:
            t = await q.get()
            q.task_done()

loop = asyncio.get_event_loop()
task = loop.create_task(wait_for_it())

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    task.cancel()
    pending = asyncio.Task.all_tasks()
    try:
        loop.run_until_complete(asyncio.gather(*pending))
    except asyncio.CancelledError:
        print('expected')
    loop.close()

which is very unwieldy and not obvious.

Instead I propose a drain feature which gives us the following code

import asyncio

q = asyncio.Queue()

async def wait_for_it():
    try:
        while True:
            t = await q.get()
            q.task_done()
    except asyncio.QueueClosed:
        print('closed')

loop = asyncio.get_event_loop()
asyncio.ensure_future(wait_for_it())
asyncio.ensure_future(wait_for_it())

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    loop.run_until_complete(q.drain())
    loop.close()

which is much cleaner and simple.

cjrh commented 8 years ago

This feature would be greatly appreciated. Currently there are non-obvious ways to deal with graceful shutdown in get-push scenarios, e.g., see this StackOverflow question:

https://stackoverflow.com/questions/35495711/asyncio-graceful-shutdown-for-a-pushing-server

loop.wait_for causes excessive memory usage if the queue is infrequently used,as discussed in Issue 26259, and the recommended solution there of using get_nowait is basically going back to polling which introduces unnecessary latency. This problem is currently worked-around by add a special "quit" message onto the queue, which you then have to detect inside the coroutine and shut it down. This is obviously quite ugly.

mattrasband commented 8 years ago

Definitely very excited to see this idea to escape a few times I have wanted to use queues to handoff some work. I used the less than obvious way (e.g. asyncio.gather(*pending)) that @nhumrich demonstrates and it feels like it may be difficult to recall going forward.

gvanrossum commented 8 years ago

This looks too complicated, and I'm not convinced that this belongs in asyncio rather than in the app.

vxgmichel commented 8 years ago

I agree that the PR includes too much logic but it has an interesting point: what OP called drain_nowait wakes up any ongoing get() coroutine with an exception if the queue is empty.

I think this part could be useful, since I often end up implementing this manually by putting None into the queue to indicate it is not going to get any more item:

    while True:
        item = await queue.get()
        if item is None:
            break
        process(item)

The alternative to this is either to cancel the coroutine that consumes the data, or use asyncio.wait with FIRST_COMPLETED to wait for queue.get() and a stop event simultaneously.

A few weeks ago, I wrote two producer/consumer examples for asyncio.readthedocs.io (one with the None item approach, the other one with the cancelling approach), but I feel those workarounds make the examples less obvious than they should be.

gvanrossum commented 8 years ago

Users of the stdlib queue module have to put a None into the queue as well. I don't see why that's so difficult.

On Fri, Sep 9, 2016 at 1:28 PM, Vincent Michel notifications@github.com wrote:

I agree that the PR includes too much logic but it has an interesting point: what OP called drain_nowait wakes up any ongoing get() coroutine with an exception if the queue is empty.

I think this part could be useful, since I often end up implementing this manually by putting None into the queue to indicate it is not going to get any more item:

while True:
    item = await queue.get()
    if item is None:
        break
    process(item)

The alternative to this is either to cancel the coroutine that consumes the data, or use asyncio.wait with FIRST_COMPLETED to wait for queue.get() and a stop event simultaneously.

A few weeks ago, I wrote two producer/consumer examples https://asyncio.readthedocs.io/en/latest/producer_consumer.html#producer-consumer for asyncio.readthedocs.io (one with the None item approach, the other one with the cancelling approach), but I feel those workarounds make the examples more complicated than they should be.

— You are receiving this because you modified the open/close state. Reply to this email directly, view it on GitHub https://github.com/python/asyncio/pull/415#issuecomment-246030327, or mute the thread https://github.com/notifications/unsubscribe-auth/ACwrMhtu3DaNS_jrZQf3PqfXqYlzI5tJks5qocFjgaJpZM4J4gMB .

--Guido van Rossum (python.org/~guido)

mwfrojdman commented 7 years ago

Putting Nones to the queue has the limitation that the producer has to know how many consumers there are. If the queue is used by zero or one consumers at a time, it works, but 2..n and you have to do something like wrap the Queue.get call with asyncio.as_completed that also waits for an event that gets set when the producer(s) stop.

I had the latter in my app to work around the 2+ consumers situation, but the amount of code changed from one line to ~ a dozen and performance took a dive (probably because of all the tasks created).

Now I have a ClosableQueue Queue subclass with a close method that just sets a QueueClosed exception on all current getters and putters, sets the _finished event and raises the same exception if someone calls any of the instance's methods.

+1 for having such functionality in the stdlib. Unless you control both producers and consumers, you'll end up with hacks (like .put(None)) that may or may not work, or reinventing the wheel.

gvanrossum commented 7 years ago

If you can convince the core devs that the non-asyncio queue.py needs this feature too we can add it to asyncio.

mwfrojdman commented 7 years ago

I started to hesitate thinking my use case is a special unicorn after all and not necessarily something that should be in the standard library, until I just ran into the same .put(None) in the standard library while investigating a separate issue: https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Lib/concurrent/futures/thread.py#L141

Maybe I'll draft a suggestion after all in a couple of weeks after publishing the library I'm working on now. Or if someone else feels like picking up the ball, there's one good example in the link. Googling/grepping for queue.put(None) may reveal plenty more of these.

rhettinger commented 7 years ago

I spent some time researching this and came to the conclusion that queue.close() makes sense for multi-processing and for distributed computing, but not so much for threads and asyncio where there isn't much benefit from having multiple-consumers and where we already have other cheap ways to communicate (via a shared active/shutdown variable, via a None message in the message queue itself, via timeouts, or via a secondary command queue that could be used for a variety of restart/shutdown/pause/resume commands to consumer threads). As far a I can tell, consumer get() code written with a Shutdown exception handler isn't cleaner or faster than equivalent code that checks for None or some other sentinel.

gvanrossum commented 7 years ago

Thanks for your review Raymond!