Polyconseil / aioamqp

AMQP implementation using asyncio
Other
279 stars 88 forks source link

set_result on canceled future #34

Open jcarmena opened 9 years ago

jcarmena commented 9 years ago

The asyncio documentation says:

Don’t call set_result() or set_exception() method of Future if the future is cancelled: it would fail with an exception. For example, write:

if not fut.cancelled():
    fut.set_result('done')

The future state is not checked and breaks (I don't know how the future gets cancelled, yet) :

error on dispatch
Traceback (most recent call last):
  File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/protocol.py", line 196, in run
    yield from self.dispatch_frame()
  File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/protocol.py", line 177, in dispatch_frame
    yield from self.channels[frame.channel].dispatch_frame(frame)
  File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/channel.py", line 91, in dispatch_frame
    yield from methods[(frame.class_id, frame.method_id)](frame)
  File "/usr/lib/python3.4/asyncio/tasks.py", line 84, in coro
    res = func(*args, **kw)
  File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/channel.py", line 120, in open_ok
    fut.set_result(True)
  File "/usr/lib/python3.4/asyncio/futures.py", line 298, in set_result
    raise InvalidStateError('{}: {!r}'.format(self._state, self))
asyncio.futures.InvalidStateError: CANCELLED: Future<CANCELLED>
dzen commented 9 years ago

Thank you for this bug report.

The problem is to know why the future was cancelled, but it shows that we need to check the futures more carefuly.

Can you add more details on how you get this error ? The conditions ? Thank you.

jcarmena commented 9 years ago

Sure. I'm using aioamqp inside a web app to spawn background tasks in another machine. I create a new channel in each request and then I publish to an exchange, like this:

channel = yield from protocol.channel()
yield from channel.publish("test", exchange_name='workers', routing_key='task1')

The issue comes when I do localhost ApacheBench tests with -n 10000 and -c 1000:

$ ab -n 10000 -c 1000 localhost:8080/
This is ApacheBench, Version 2.3 <$Revision: 1528965 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
apr_socket_recv: Connection reset by peer (104)
Total of 9873 requests completed

I think the -n and -c values will be different in each machine. I have reduced the code to 1) get request, 2) publish to exchange,
but still the same issue.

Regards

dzen commented 9 years ago

Is there any interesting log in your amqp broker ? what does it says ?

jcarmena commented 9 years ago

It's RabbitMQ. Nothing interesting there, only open/close connections.

I have noticed that it does not happen every time (but apache bench always fails "apr_socket_recv: Connection reset by peer (104)"), so it seems that when the http server breaks the future is canceled and then occurs some kind of race condition.

Perhaps it's not your fault and you only have to check future's state for graceful behavior when program breaks.

dzen commented 9 years ago

I'll try something similar tomorow afternoon. Which asyncio http server are you using ? aiohttp.web ?

jcarmena commented 9 years ago

Yes, it's aiohttp, with this minimal code fails too:

import asyncio
import aioamqp
import textwrap
from aiohttp.web import Application, Response, StreamResponse

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--port", help="Port")
args = parser.parse_args()
port = args.port or 8080

def index(request):
    channel = yield from protocol.channel()
    yield from channel.publish("test", exchange_name='workers', routing_key='work1')
    return Response(body=b'OK')

@asyncio.coroutine
def init(loop):
    global transport, protocol
    transport, protocol = yield from aioamqp.connect()

    app = Application(loop=loop)
    app.router.add_route('GET', '/', index)

    handler = app.make_handler()
    srv = yield from loop.create_server(handler, 'localhost', port)
    print("Server started at http://localhost:" + str(port))
    return srv, handler

transport, protocol = None, None
loop = asyncio.get_event_loop()
srv, handler = loop.run_until_complete(init(loop))
try:
    loop.run_forever()
except KeyboardInterrupt:
    loop.run_until_complete(handler.finish_connections())

Exchanges and queues are created previously and are durable.

mwfrojdman commented 9 years ago

index() creates a new channel on every request, but doesn't close them ever. Does the problem reproduce with a yield from channel.close() before return Response()?

This doesn't sound like abusing the protocol as the server gladly creates the new channels, but might be related to the error emerging.

dzen commented 9 years ago

Hello,

I used your script on my machine, I set index to be a coroutine, and added a yield from channel.close() in this coroutine.

results:

$ ab -n 10000 -c 1000 localhost:8080/ This is ApacheBench, Version 2.3 <$Revision: 1604373 $> Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient) Completed 1000 requests Completed 2000 requests Completed 3000 requests Completed 4000 requests Completed 5000 requests Completed 6000 requests Completed 7000 requests Completed 8000 requests Completed 9000 requests Completed 10000 requests Finished 10000 requests

Server Software:
Server Hostname: localhost Server Port: 8080

Document Path: / Document Length: 2 bytes

Concurrency Level: 1000 Time taken for tests: 8.997 seconds Complete requests: 10000 Failed requests: 0 Total transferred: 1310000 bytes HTML transferred: 20000 bytes Requests per second: 1111.43 #/sec Time per request: 899.741 ms Time per request: 0.900 [ms](mean, across all concurrent requests) Transfer rate: 142.18 [Kbytes/sec] received

Connection Times (ms) min mean[+/-sd] median max Connect: 0 491 1083.9 0 7013 Processing: 18 213 590.8 106 6504 Waiting: 18 209 590.8 102 6503 Total: 18 703 1408.4 115 8982

Percentage of the requests served within a certain time (ms) 50% 115 66% 165 75% 1105 80% 1118 90% 1315 95% 3069 98% 7494 99% 8925 100% 8982 (longest request)

jcarmena commented 9 years ago

Arg, I forgot channel.close() but still fails, try it with higher numbers

dzen commented 9 years ago

Hello again,

I ran 'ab' with :

 $ ab -n 1000000 -c 10000 localhost:8080/

apr_socket_recv: Connection reset by peer (104)
Total of 69595 requests completed

And got an error when decoding the frame:

Error handling request
Traceback (most recent call last):
  File "/home/benoit/test/venv34/lib/python3.4/site-packages/aiohttp/server.py", line 272, in start
    yield from self.handle_request(message, payload)
  File "/home/benoit/test/venv34/lib/python3.4/site-packages/aiohttp/web.py", line 85, in handle_request
    resp = yield from handler(request)
  File "ai.py", line 16, in index
    channel = yield from protocol.channel()
  File "/home/benoit/Projects/blue/aioamqp/aioamqp/protocol.py", line 306, in channel
    yield from channel.open()
  File "/home/benoit/Projects/blue/aioamqp/aioamqp/channel.py", line 127, in open
    yield from self._write_frame(frame, request, no_wait=False, timeout=timeout, no_check_open=True)
  File "/usr/lib/python3.4/asyncio/coroutines.py", line 141, in coro
    res = func(*args, **kw)
  File "/home/benoit/Projects/blue/aioamqp/aioamqp/channel.py", line 111, in _write_frame
    frame.write_frame(request)
  File "/home/benoit/Projects/blue/aioamqp/aioamqp/frame.py", line 385, in write_frame
    header = struct.pack('!BHI', self.frame_type, self.channel, payload.tell() + len(content_header))
struct.error: 'H' format requires 0 <= number <= 65535

I must check the doc to check the frame parsing. Can you test against the last master ?

dzen commented 9 years ago

aioamqp cannot reuses previous channel id for now.

I created issue #36

jcarmena commented 9 years ago

Last master keeps failing. Remember that it does not shows the error every time, it does after two or three tests.

dzen commented 9 years ago

I'll retest this when the library would reuise the channel id

ariddell commented 9 years ago

I also encountered this. (Or at least I think I did.) I take it that fixing the problem is more complicated than just adding a check if fut.cancelled() in channel.py? https://docs.python.org/3/library/asyncio-dev.html?highlight=cancelled#cancellation

ariddell commented 8 years ago

Here's a log where the problem occurs. It looks like self._get_waiter('close') in channel.py is canceled, so self._get_waiter('close').set_result(True) raises an exception.

Dec 28 08:51:02 etna docker[25369]: ERROR:aioamqp.protocol:error on dispatch
Dec 28 08:51:02 etna docker[25369]: Traceback (most recent call last):
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/protocol.py", line 256, in run
Dec 28 08:51:02 etna docker[25369]: yield from self.dispatch_frame()
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/protocol.py", line 211, in dispatch_frame
Dec 28 08:51:02 etna docker[25369]: yield from self.channels[frame.channel].dispatch_frame(frame)
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/channel.py", line 110, in dispatch_frame
Dec 28 08:51:02 etna docker[25369]: yield from methods[(frame.class_id, frame.method_id)](frame)
Dec 28 08:51:02 etna docker[25369]: File "/usr/lib/python3.5/asyncio/coroutines.py", line 206, in coro
Dec 28 08:51:02 etna docker[25369]: res = func(*args, **kw)
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/channel.py", line 165, in close_ok
Dec 28 08:51:02 etna docker[25369]: self._get_waiter('close').set_result(True)
Dec 28 08:51:02 etna docker[25369]: File "/usr/lib/python3.5/asyncio/futures.py", line 329, in set_result
Dec 28 08:51:02 etna docker[25369]: raise InvalidStateError('{}: {!r}'.format(self._state, self))
Dec 28 08:51:02 etna docker[25369]: asyncio.futures.InvalidStateError: CANCELLED: <Future cancelled>

I think this might be occurring in a case where there's two attempts to close the channel.

dzen commented 8 years ago

Hello @ariddell,

would you please paste some ? it seems you already closed the channel ?

ariddell commented 8 years ago

I'm not doing anything sophisticated, just a simple RPC setup; no multi-threading just asyncio. If I do have two coroutines that both close the connection/channel there shouldn't be an error, right?

I'll see if I can't figure out a way to reproduce the error.

dzen commented 8 years ago

I have a few days to have a look right now. I can push a branch with a fix, but I would like to know how you're using aioamqp and how you're triggering this bug.

Thank you

ariddell commented 8 years ago

I'm pretty sure I'm calling close on the connection and then close on a channel (associated with the connection). I know this is wrong but I think aioamqp might want to check on the future being cancelled.

In case you're looking for prior art, here is how aiohttp closes a websocket -- They have a _closed variable that tracks state. And they return False if the connection is already closed.

https://github.com/KeepSafe/aiohttp/blob/e09b86204c9099389c530b2886770e0060a05f63/aiohttp/web_ws.py#L174

    @asyncio.coroutine
    def close(self, *, code=1000, message=b''):
        if self._writer is None:
            raise RuntimeError('Call .prepare() first')

        if not self._closed:
            self._closed = True
            try:
                self._writer.close(code, message)
            except (asyncio.CancelledError, asyncio.TimeoutError):
                self._close_code = 1006
                raise
            except Exception as exc:
                self._close_code = 1006
                self._exception = exc
                return True

            if self._closing:
                return True

            while True:
                try:
                    msg = yield from asyncio.wait_for(
                        self._reader.read(),
                        timeout=self._timeout, loop=self._loop)
                except asyncio.CancelledError:
                    self._close_code = 1006
                    raise
                except Exception as exc:
                    self._close_code = 1006
                    self._exception = exc
                    return True

                if msg.tp == MsgType.close:
                    self._close_code = msg.data
                    return True
        else:
            return False
dzen commented 8 years ago

Hello @ariddell.

In aioamqp, the code is a little bit different: you get an exception in the code when receiving the confirmation from the server (https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.close-ok) but the whole channel is already mark'd as closed.

Could you please tell me how you're triggering this behaviour ? I'll dive into it and probably rework the way we're closing the channel.

Thank you.

ariddell commented 8 years ago

I don't know how the exception is happening. I think it's something in a finally clause so it's not affecting my application. I'll keep you posted. Thanks for your work on this!