Closed schmir closed 11 years ago
Any idea how can I reliably reproduce that? (or how to fix it / where the error lies)
I don't know how to reliably produce that. It feels like it fails 1 out of 30 times. The above traceback contains the values of local variables. Doesn't that give you a hint? (I know basically nothing about the wire protocol, so I don't even have a good guess what the problem may be)
I've added some print statements.
diff --git a/puka/promise.py b/puka/promise.py index 95d3d55..9e4b4e3 100644 --- a/puka/promise.py +++ b/puka/promise.py @@ -82,6 +82,7 @@ class Promise(object): self.done(result) def recv_method(self, result): + print "RECV:", self.number, type(result), result # log.debug('#%i recv_method %r', self.number, result) # In this order, to allow callback to re-register to the same method. callback = self.methods[result.method_id] diff --git a/tests/test_basic.py b/tests/test_basic.py index 1ed5fdd..d4bbb07 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -419,11 +419,16 @@ class TestBasic(base.TestCase): client.wait(promise) consume_promise = client.basic_consume(queue=self.name) + print "CONSUME_PROMISE:", consume_promise msg_result = client.wait(consume_promise) promise = client.queue_delete(self.name) + # print "PROMISE:", promise client.wait(promise) + # client.wait(consume_promise) + + print "QUEUE DELETED" promise = client.close() client.wait(promise)
With a successful run of test_basic I get the following output:
RECV: 1 <class 'puka.spec.FrameConnectionStart'> {'server_properties': {'information': 'Licensed under the MPL. See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2012 VMware, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'consumer_cancel_notify': True, 'publisher_confirms': True, 'basic.nack': True}, 'platform': 'Erlang/OTP', 'version': '2.8.6'}, 'version_minor': 9, 'mechanisms': 'PLAIN AMQPLAIN', 'locales': 'en_US', 'version_major': 0} RECV: 1 <class 'puka.spec.FrameConnectionTune'> {'frame_max': 131072, 'channel_max': 0, 'heartbeat': 0} RECV: 1 <class 'puka.spec.FrameConnectionOpenOk'> {'known_hosts': ''} RECV: 2 <class 'puka.spec.FrameChannelOpenOk'> {'channel_id': ''} RECV: 3 <class 'puka.spec.FrameChannelOpenOk'> {'channel_id': ''} RECV: 2 <class 'puka.spec.FrameConfirmSelectOk'> {} RECV: 3 <class 'puka.spec.FrameQueueDeclareOk'> {'queue': 'test0.297503201551', 'message_count': 0, 'consumer_count': 0} RECV: 2 <class 'puka.spec.FrameBasicAck'> {'multiple': False, 'delivery_tag': 1} CONSUME_PROMISE: 5 RECV: 5 <class 'puka.spec.FrameBasicQosOk'> {} RECV: 5 <class 'puka.spec.FrameBasicConsumeOk'> {'consumer_tag': 'amq.ctag-gwAvLTEtzSr6sn878VmNB5'} RECV: 5 <class 'puka.spec.FrameBasicDeliver'> {'body': '0.22912621845', 'exchange': '', 'consumer_tag': 'amq.ctag-gwAvLTEtzSr6sn878VmNB5', 'routing_key': 'test0.297503201551', 'headers': {'x-puka-delivery-tag': 1}, 'redelivered': False, 'delivery_tag': 1} RECV: 6 <class 'puka.spec.FrameChannelOpenOk'> {'channel_id': ''} RECV: 6 <class 'puka.spec.FrameQueueDeleteOk'> {'message_count': 0} RECV: 5 <class 'puka.spec.FrameBasicCancel'> {'consumer_tag': 'amq.ctag-gwAvLTEtzSr6sn878VmNB5', 'nowait': True} QUEUE DELETED RECV: 5 <class 'puka.spec.FrameBasicCancelOk'> {'consumer_tag': 'amq.ctag-gwAvLTEtzSr6sn878VmNB5'} RECV: 1 <class 'puka.spec.FrameConnectionCloseOk'> {}
On errors I get
... QUEUE DELETED RECV: 1 <class 'puka.spec.FrameConnectionCloseOk'> {} RECV: 5 <class 'puka.spec.FrameBasicCancelOk'> {'consumer_tag': 'amq.ctag-wgB5BfsORMGmQVZrhfsECx'}
i.e. the client receives the FrameBasicCancelOk (for the consume_promise) after the FrameConnectionCloseOk.
Handling of FrameConnectionCloseOk involves calling the Connection._shutdown method, which marks the consume_promise as done.
Later on it can't handle the FrameBasicCancelOk frame...
Interesting. So you're saying there is a race. Does it also happen when you add basic_cancel
for the basic_consume
promise?
I still can't reproduce this. Which RMQ do you use?
i.e. the client receives the FrameBasicCancelOk (for the consume_promise) after the FrameConnectionCloseOk.
How is that even possible?
Do I get it right: on network layer RMQ first sends connection_close_ok frame and later sends basic_cancel_ok frame?
RabbitMQ 2.8.6 on a 64 bit linux. the tests are running on the same machine and I it looks like I need to put a bit of load on the machine. I've got something like os.walk implemented via RabbitMQ, which I run at the same time. I run the following shell command:
while true; do py.test --pdb -s -k test_close tests/test_basic.py; done
My guess it's possible that FrameBasicCancelOk is received after FrameConnectionCloseOk because RabbitMQ is sending it in exactly that order (and then closes the connection). :) Is that a bug in RabbitMQ?
If I basic_cancel the consume_promise before and wait for it all should be fine. since then rabbitmq doesn't have anything sensible to tell me. I wouldn't expect a problem...but if you insist on it, I can test it...
Yes, I would say that after "connection close ok" frame on tcp/ip layer nothing should be sent by RMQ. I'm not sure if that really happens. If you can prove that, it would be nice to ask rabbitmq-discuss mailign list if they consider it a bug.
Should we clear receive buffer on handling of connection_close_ok frame?
(or even better: set a flag not to ignore all future reads from the network)
Setting a flag looks reasonable. Another option may be to throw an exception, and catch in the calling code...
I may have written a test for it, but I'm still waiting on your opinion on py.test.
Can you reproduce the issue with https://gist.github.com/4344132 ? it's basically just a loop over the code in question. I can easily reproduce the error on a otherwise unloaded machine with two instances of this program running...
Thanks for the code, now I can reproduce the issue :)
Try this:
diff --git a/puka/connection.py b/puka/connection.py
index ae51c07..ec6ea61 100644
--- a/puka/connection.py
+++ b/puka/connection.py
@@ -107,7 +107,7 @@ class Connection(object):
if len(self.recv_buf) >= self.recv_need:
data = self.recv_buf.read()
offset = 0
- while len(data) - offset >= self.recv_need:
+ while len(data) - offset >= self.recv_need and self.sd:
offset, self.recv_need = \
self._handle_read(data, offset)
self.recv_buf.consume(offset)
If you're happy building RabbitMQ from source, you can try branch bug25360, which I believe fixes the behaviour you're seeing in the server.
I tried it and I can't reproduce the issue when using that branch! Thanks!
@schmir unless you oppose I'll apply the patch mentioned in https://github.com/majek/puka/issues/34#issuecomment-11568573
@majek sure, go ahead. that also seems to work.
probably better "self.sd is not None"
I see intermittent errors in test_basic. I've seen this in test_close and test_basic_qos, when the code is waiting on the client.close() promise: