jasonrbriggs / stomp.py

“stomp.py” is a Python client library for accessing messaging servers (such as ActiveMQ or RabbitMQ) using the STOMP protocol (versions 1.0, 1.1 and 1.2). It can also be run as a standalone, command-line client for testing.
Apache License 2.0
491 stars 167 forks source link

[#393] Fix race condition between ack/nack and disconnect #394

Open carantunes opened 2 years ago

carantunes commented 2 years ago

Issue: #393

I identified a race condition when there's some load of processing messages and a disconnect occurs. This issue only occurs with ack mode client/client-individual which means a ACK or NACK frame will be sent to the server.

In such cases, between the milliseconds a disconnect begins and concludes, some other threads might start processing a new message and when trying to send the frame ACK/NACK will result in either a native BrokenPipeError (unhandled by stompy) or a org.apache.activemq.transport.stomp.ProtocolException: Not connected (properly handled).

I'm providing a MR associated with the issue, showcasing the issue in a new test case and a solution. For the test case I rely on repetition given that the issue is a race condition.

For the solution I suggest the addition of a wait parameter to the disconnect method, which allows instantly stop receiving and enables wait for current threads to finish already started acks/nacks before stoping executing and closing the socket.

carantunes commented 2 years ago

Please note the difference between both commits, I believe the second solution might be a bit more clear/easy to maintain.

jasonrbriggs commented 1 year ago

The unit test doesn't work for me when I run locally, unfortunately:

DEBUG    stomp.py:transport.py:287 Sending frame: [b'ACK', b'\n', b'message-id:ID\\c442e3d5db544-39705-1662916537185-3\\c253\\c-1\\c1\\c7\n', b'subscription:1\n', b'\n', b'\x00']
ERROR    stomp.py:transport.py:679 Error sending frame
Traceback (most recent call last):
  File "/home/jasonrbriggs/Development/stomppy/stomp/transport.py", line 676, in send
    self.socket.sendall(encoded_frame)
BrokenPipeError: [Errno 32] Broken pipe
ERROR    stomp.py:test_disconnect_wait.py:28 Expected BrokenPipeError
DEBUG    stomp.py:transport.py:202 Received frame: 'MESSAGE', headers={'content-length': '12', 'expires': '0', 'destination': '/queue/disconnectmidack-20220911191146', 'subscription': '1', 'priority': '4', 'message-id': 'ID:442e3d5db544-39705-1662916537185-3:253:-1:1:8', 'timestamp': '1662919906243'}, body='test message'
DEBUG    stomp.py:transport.py:287 Sending frame: [b'ACK', b'\n', b'message-id:ID\\c442e3d5db544-39705-1662916537185-3\\c253\\c-1\\c1\\c8\n', b'subscription:1\n', b'\n', b'\x00']
ERROR    stomp.py:transport.py:679 Error sending frame
Traceback (most recent call last):
  File "/home/jasonrbriggs/Development/stomppy/stomp/transport.py", line 676, in send
    self.socket.sendall(encoded_frame)
BrokenPipeError: [Errno 32] Broken pipe
ERROR    stomp.py:test_disconnect_wait.py:28 Expected BrokenPipeError
DEBUG    stomp.py:transport.py:202 Received frame: 'MESSAGE', headers={'content-length': '12', 'expires': '0', 'destination': '/queue/disconnectmidack-20220911191146', 'subscription': '1', 'priority': '4', 'message-id': 'ID:442e3d5db544-39705-1662916537185-3:253:-1:1:9', 'timestamp': '1662919906244'}, body='test message'
DEBUG    stomp.py:transport.py:287 Sending frame: [b'ACK', b'\n', b'message-id:ID\\c442e3d5db544-39705-1662916537185-3\\c253\\c-1\\c1\\c9\n', b'subscription:1\n', b'\n', b'\x00']
ERROR    stomp.py:transport.py:679 Error sending frame
Traceback (most recent call last):
  File "/home/jasonrbriggs/Development/stomppy/stomp/transport.py", line 676, in send
    self.socket.sendall(encoded_frame)
BrokenPipeError: [Errno 32] Broken pipe
ERROR    stomp.py:test_disconnect_wait.py:28 Expected BrokenPipeError
DEBUG    stomp.py:transport.py:202 Received frame: 'MESSAGE', headers={'content-length': '12', 'expires': '0', 'destination': '/queue/disconnectmidack-20220911191146', 'subscription': '1', 'priority': '4', 'message-id': 'ID:442e3d5db544-39705-1662916537185-3:253:-1:1:10', 'timestamp': '1662919906245'}, body='test message'
DEBUG    stomp.py:transport.py:287 Sending frame: [b'ACK', b'\n', b'message-id:ID\\c442e3d5db544-39705-1662916537185-3\\c253\\c-1\\c1\\c10\n', b'subscription:1\n', b'\n', b'\x00']
ERROR    stomp.py:transport.py:679 Error sending frame
Traceback (most recent call last):
  File "/home/jasonrbriggs/Development/stomppy/stomp/transport.py", line 676, in send
    self.socket.sendall(encoded_frame)
BrokenPipeError: [Errno 32] Broken pipe
ERROR    stomp.py:test_disconnect_wait.py:28 Expected BrokenPipeError
DEBUG    stomp.py:test_disconnect_wait.py:81 T10 : messages started 23441
------------------------- generated html file: file:///home/jasonrbriggs/Development/stomppy/tmp/report.html --------------------------

---------- coverage: platform linux, python 3.10.4-final-0 -----------
Coverage HTML written to dir ../stomppy-docs/htmlcov/

======================================================= short test summary info =======================================================
FAILED tests/test_disconnect_wait.py::test_assert_fixed_race_condition_in_disconnect_mid_ack - AssertionError: should not have errors
============================================== 1 failed, 98 passed in 124.21s (0:02:04) ===============================================