Open abbiemery opened 1 year ago
I have investigated this, and I've narrowed down the cause of this issue. Importantly, I've found a fix for it:
This stems from the following test:
@pytest.mark.asyncio
async def test_socket_cleaned_up_on_cancel(
mock_device: Device,
mock_raise_interrupt: RaiseInterrupt,
) -> None:
adapter_a = ZeroMqPushAdapter()
adapter_b = ZeroMqPushAdapter()
for adapter in (adapter_a, adapter_b):
task = asyncio.create_task(
adapter.run_forever(
mock_device,
mock_raise_interrupt,
)
)
await adapter.send_message([b"test"])
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
assert task.done()
What seems to happen is, occasionally when the for loop gets to the await adapter.send_message([])
line for the 2nd time, the socket from the previous run of the loop hasn't been shut down properly.
I've found threads like https://github.com/aio-libs/aiozmq/issues/72#issuecomment-280639202 and https://stackoverflow.com/questions/45805714/how-to-close-properly-with-aiozmq-zmq, which led me to find out about zmq's LINGER property, documented (here)[http://api.zeromq.org/2-1%3azmq-setsockopt#toc15]. This suggests that by default, when zmq connections are made and a message is sent (but hasn't been recieved by the peer), the message will just linger - to me this suggests that the connection will not fully close.
I've tried to fix this by doing:
async def create_zmq_push_socket(host: str, port: int) -> aiozmq.ZmqStream:
addr = f"tcp://{host}:{port}"
stream = await aiozmq.create_zmq_stream(zmq.PUSH, connect=addr, bind=addr)
transport = cast(aiozmq.ZmqTransport, stream.transport)
transport.setsockopt(zmq.LINGER, 0)
return stream
But that hasn't seemed to help anything. So, I found another fix; aborting the stream transport. i.e. force killing it. Maybe in future we can investigate exactly why this is happening but... it's really quite annoying to debug aiozmq library as it's not consistently typed.
After much deliberation, I've decided the fix to this is to move from aiozmq to pyzmq, as the latter is supported for later versions of python and has had successful CI jobs for recent releases (https://github.com/zeromq/pyzmq/tree/v25.0.2)
I've noticed a few times now that there are sporadic failures of CI jobs in the pytest stage due to socket issues. It is hard to replicate since there is no obvious decernable pattern to when they fail, but this needs investigating.