benjamin-hodgson / asynqp

An AMQP library for asyncio
MIT License
84 stars 29 forks source link

Coroutines crash when are cancelled #52

Open tvoinarovskyi opened 9 years ago

tvoinarovskyi commented 9 years ago

Basically the Synchroniser must discard cancelled futures. Example:

import asyncio
import asynqp

@asyncio.coroutine
def main_coro(loop):
    # connect to the RabbitMQ broker
    connection = yield from asynqp.connect(
        'localhost', 5672, username='guest', password='guest')

    # Open a communications channel
    channel = yield from connection.open_channel()
    queue = yield from channel.declare_queue('test.queue')

    get_task = asyncio.async(queue.get())
    print("It can go up to here")
    # We have bad connection so get timed out.
    try:
        yield from asyncio.wait_for(get_task, timeout=0.0001)
    except asyncio.TimeoutError:
        pass

    # Next get will hang cause we did not cleanup future in Synchroniser
    yield from queue.get()
    print("but hangs here =(")

    yield from channel.close()
    yield from connection.close()

def main():
    loop = asyncio.get_event_loop()
    main_task = asyncio.async(main_coro(loop))
    try:
        loop.run_until_complete(main_task)
    except KeyboardInterrupt:
        main_task.cancel()
        loop.run_until_complete(main_task)

if __name__ == "__main__":
    main()
tvoinarovskyi commented 9 years ago

It's an easy fix after #49, but needs some descend tests I think. It's easy to encounter this issue when doing reconnect (you will want a wait_for wrapper just in case)

benjamin-hodgson commented 9 years ago

Thanks for the bug report. I can probably take a look at this tomorrow

tvoinarovskyi commented 9 years ago

Ok, It seems harder than I thought if would be... Even after I patched the synchroniser it fails, cause the reader.ready() is not called. We can replace the code so reader.ready is called even if we cancel the coroutine, but I find it disturbing to do so... I thought it would be as easy fix as this commit but if doesn't work =(