aio-libs / aiohttp

Asynchronous HTTP client/server framework for asyncio and Python
https://docs.aiohttp.org
Other
15.19k stars 2.03k forks source link

Websocket reading message loop raises low-level CanceledError when connection is closed unexpectedly #2061

Open kr41 opened 7 years ago

kr41 commented 7 years ago

Actual behaviour

Reading message loop async for msg in ws: raises low-level concurrent.futures._base.CancelledError when connection is closed unexpectedly.

Expected behaviour

Expected to get message with type aiohtto.http_websocket.WSMsgType.ERROR, or silently stop the loop, or at least aiohtto.http_websocket.WebSocketError.

Steps to reproduce

Run the following two scripts server.py and client.py, then stop client.py by Ctrl+C.

server.py

import logging

from aiohttp import web

logger = logging.getLogger(__name__)

async def index(request):
    ws = web.WebSocketResponse()
    request.app['websockets'].add(ws)

    try:
        await ws.prepare(request)
        logger.debug('Connected')
        async for msg in ws:
            logger.info('Received: %r', msg.data)
    except Exception:
        logger.exception('Error')
    logger.debug('Disconnected')

    request.app['websockets'].discard(ws)
    return ws

async def on_shutdown(app):
    for ws in app['websockets']:
        await ws.close()
    app['websockets'].clear()

def main():
    logging.basicConfig(level=logging.DEBUG)

    app = web.Application()
    app['websockets'] = set()
    app.router.add_get('/', index)
    app.on_shutdown.append(on_shutdown)

    web.run_app(app, host='127.0.0.1', port=9000)

if __name__ == '__main__':
    main()

client.py

import asyncio

import aiohttp

async def communicate(loop):
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.ws_connect('http://127.0.0.1:9000') as ws:
            while True:
                await ws.send_str('Hello')
                await asyncio.sleep(1, loop=loop)

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(communicate(loop))

if __name__ == '__main__':
    main()

Log output of server.py

$ python server.py 
DEBUG:asyncio:Using selector: EpollSelector
======== Running on http://127.0.0.1:9000 ========
(Press CTRL+C to quit)
DEBUG:__main__:Connected
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
ERROR:__main__:Error
Traceback (most recent call last):
  File "server.py", line 16, in index
    async for msg in ws:
  File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/web_ws.py", line 343, in __anext__
    msg = yield from self.receive()
  File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/web_ws.py", line 273, in receive
    msg = yield from self._reader.read()
  File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/streams.py", line 627, in read
    return (yield from super().read())
  File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/streams.py", line 509, in read
    yield from self._waiter
  File "/usr/lib64/python3.5/asyncio/futures.py", line 380, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib64/python3.5/asyncio/tasks.py", line 304, in _wakeup
    future.result()
  File "/usr/lib64/python3.5/asyncio/futures.py", line 285, in result
    raise CancelledError
concurrent.futures._base.CancelledError
DEBUG:__main__:Disconnected
INFO:aiohttp.access:- - - [06/Jul/2017:11:41:25 +0000] "GET / HTTP/1.1" 101 0 "-" "Python/3.5 aiohttp/2.2.3"

Your environment

OS: CentOS Linux 7 Linux kernel: 3.10.0-514.16.1.el7.x86_64 Python: 3.5.3 aiohttp: 2.2.3

asvetlov commented 7 years ago

Technically aiohttp creates a task per client request. On client disconnection the system stops the task ASAP. The only way to do it is task cancelling (let's assume web handler is waiting response from DB or other service, we want to cancel it too without waiting for explicit operation over connection to websocket client).

Task.cancel() is done by sending asyncio.CancelledError exception, the exception class is derived from standard Exception. This is asyncio behavior, nothing specific to aiohttp itself.

The only thing I could suggest is catching CancelledError in your handler explicitly:

try:
    ...
except asyncio.CancelledError:
    pass
except Exception as exc:
    log(exc)

Or you could just don't catch so broad type like Exception.

fafhrd91 commented 7 years ago

I see two options:

s-kostyuk commented 6 years ago

Maybe it's better to introduce a separate ConnectionClosed exception, in the same way as was done in websockets library?

twisteroidambassador commented 5 years ago

Technically aiohttp creates a task per client request. On client disconnection the system stops the task ASAP. The only way to do it is task cancelling (let's assume web handler is waiting response from DB or other service, we want to cancel it too without waiting for explicit operation over connection to websocket client).

I think I have just been bitten by this behavior. I had some code like this:

async def handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async with contextlib.AsyncExitStack() as stack:
        # acquire_resource_X are async context managers
        await stack.enter_async_context(acquire_resource_1())
        await stack.enter_async_context(acquire_resource_2())
        await stack.enter_async_context(acquire_resource_3())

        async for msg in ws:
            # do stuff

    await ws.close()

    return ws

After putting it in production I found that the exiting part of acquire_resource_3() would be silently skipped. More logging revealed that a CancelledError was being raised inside acquire_resource_3. Here's what I think happened:

This is a really weird problem, particularly because how it breaks the expectation that the exiting part of a context manager will always run. I had to basically shield all the async contexts from cancellation, like this:

async def handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    await asyncio.shield(asyncio.ensure_future(actually_do_stuff(ws)))

    return ws

async def actually_do_stuff(ws):
    async with contextlib.AsyncExitStack() as stack:
        # acquire_resource_X are async context managers
        await stack.enter_async_context(acquire_resource_1())
        await stack.enter_async_context(acquire_resource_2())
        await stack.enter_async_context(acquire_resource_3())

        async for msg in ws:
            # do stuff

    await ws.close()

Is there a better way to do this?