zeromq / pyzmq

PyZMQ: Python bindings for zeromq
http://zguide.zeromq.org/py:all
BSD 3-Clause "New" or "Revised" License
3.65k stars 637 forks source link

BUG: Asyncio cancellation race condition leaves Future exception never retrieved for recv_string() #1925

Closed ericjmcd closed 6 months ago

ericjmcd commented 6 months ago

What pyzmq version?

25.1.2

What libzmq version?

4.3.4

Python version (and how it was installed)

python 3.12 via ppa:deadsnakes/ppa

OS

ubuntu 20.04 (inside docker running on ubuntu 20.04 host)

What happened?

When cancelling an asyncio task that is looping a pull socket recv_string() with RCVTIMEO, a zmq Future exception (zmq.error.Again: Resource temporarily unavailable) is never retrieved in spite of wrapping the calls in try/except blocks. This does not happen with a simple "recv()" and I haven't checked if it's an issue with any other variants (e.g. "recv_json") or any other socket types.

Code to reproduce bug

# junk.py
# Some kind of race condition between the task/eventloop ending and a recv timeout
import asyncio
import zmq
import zmq.asyncio

async def recv(sock):
    while True:
        try:
            msg = await sock.recv_string()
        except zmq.Again:
            continue
        except asyncio.CancelledError:
            print('sock recv cancelled')
            break
    sock.close()

async def main():
    ctx = zmq.asyncio.Context.instance()
    sock = ctx.socket(zmq.PULL)
    sock.bind('tcp://0.0.0.0:1234')
    sock.RCVTIMEO = 1  # If you set this larger than below sleep, issue never happens (I think)
    sock.LINGER = 0
    rx_task = asyncio.create_task(recv(sock))
    await asyncio.sleep(1)
    rx_task.cancel()
    try:
        await rx_task
    except asyncio.CancelledError:
        print('cancelled')
    except zmq.Again:
        # This never catches the ZMQ Future exception
        print('timed out')

asyncio.run(main())

Traceback, if applicable

~ <DKR> python3 junk.py 
Future exception was never retrieved
future: <Future finished exception=Again('Resource temporarily unavailable')>
zmq.error.Again: Resource temporarily unavailable
sock recv cancelled

From VSCode:
 ~ <DKR>  cd /workspace ; /usr/bin/env /bin/python3.12 /tmp/.vscode-server/extensions/ms-python.python-2024.0.1/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher 39359 -- /tmp/junk.py 
Future exception was never retrieved
future: <Future finished exception=Again('Resource temporarily unavailable') created at /usr/local/lib/python3.12/dist-packages/zmq/_future.py:473>
source_traceback: Object created at (most recent call last):
  File "/usr/lib/python3.12/runpy.py", line 198, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.12/runpy.py", line 88, in _run_code
    exec(code, run_globals)
  File "/tmp/.vscode-server/extensions/ms-python.python-2024.0.1/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher/../../debugpy/__main__.py", line 39, in <module>
    cli.main()
  File "/tmp/.vscode-server/extensions/ms-python.python-2024.0.1/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher/../../debugpy/../debugpy/server/cli.py", line 430, in main
    run()
  File "/tmp/.vscode-server/extensions/ms-python.python-2024.0.1/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher/../../debugpy/../debugpy/server/cli.py", line 284, in run_file
    runpy.run_path(target, run_name="__main__")
  File "/tmp/.vscode-server/extensions/ms-python.python-2024.0.1/pythonFiles/lib/python/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py", line 321, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "/tmp/.vscode-server/extensions/ms-python.python-2024.0.1/pythonFiles/lib/python/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py", line 135, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "/tmp/.vscode-server/extensions/ms-python.python-2024.0.1/pythonFiles/lib/python/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py", line 124, in _run_code
    exec(code, run_globals)
  File "/tmp/junk.py", line 35, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
  File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
  File "/usr/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
    self._run_once()
  File "/usr/lib/python3.12/asyncio/base_events.py", line 1963, in _run_once
    handle._run()
  File "/usr/lib/python3.12/asyncio/events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
  File "/tmp/junk.py", line 9, in recv
    msg = await sock.recv_string()
  File "/usr/local/lib/python3.12/dist-packages/zmq/_future.py", line 427, in recv_string
    return super().recv_string(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.12/dist-packages/zmq/sugar/socket.py", line 934, in recv_string
    msg = self.recv(flags=flags)
  File "/usr/local/lib/python3.12/dist-packages/zmq/_future.py", line 309, in recv
    return self._add_recv_event('recv', dict(flags=flags, copy=copy, track=track))
  File "/usr/local/lib/python3.12/dist-packages/zmq/_future.py", line 473, in _add_recv_event
    f = future or self._Future()
zmq.error.Again: Resource temporarily unavailable
sock recv cancelled

More info

No response

minrk commented 6 months ago

Thanks for reporting! recv_string (and json, etc.) does have one additional stringing of futures together. The warning happens here, because the underlying recv timeout and the cancellation have both completed in the same tick. The fix for this more common situation is to add a f.exception() to consume the error. However, this reveals that there is a risk of lost messages, because chained futures do not propagate immediately, they propagate on the next event loop tick. So the warning is actually telling us that something isn't hooked up right!

FWIW, in this example, you're setting RCVTIMEO to 1 millisecond and sleeping for 1 second. So there are hundreds zmq.Agains that are caught, and one that tries to cancel and fails.

It triggers the issue more reliably for me if I set RCVTIMEO = 100, time.sleep(0.1), where both the cancel and the Again timeout are set to occur a the same time.

I think losing messages like this is quite unlikely, but not impossible. Missing error messages is far more likely, and easy to fix.

Really, what should happen here is that a completed operation shouldn't be allowed to cancel. The problem is that chaining that pyzmq does via future.add_done_callback actually takes one additional event-loop tick to propagate the cancellation, which means that you can try to cancel an action and it won't actually be cancelled, but you've given up waiting for it.

minrk commented 6 months ago

1938 fixes the warning in the most common case of ignoring zmq.Again, which can be safely ignored.