zeromq / pyzmq

PyZMQ: Python bindings for zeromq
http://zguide.zeromq.org/py:all
BSD 3-Clause "New" or "Revised" License
3.62k stars 635 forks source link

BUG: PUBHandler is not thread-safe when using with async Context #1967

Open rmorshea opened 3 months ago

rmorshea commented 3 months ago

This is a pyzmq bug

What pyzmq version?

22.3.0

What libzmq version?

4.3.4

Python version (and how it was installed)

Python 3.10 via conda-forge

OS

macOS 14

What happened?

If you attempt to send a message from a different thread that doesn't have an event loop running when using the zqm.asyncio.Context you'll get an error complain about missing event loop because Socket._Future() requests access to the currently running event loop by default.

In most cases this would be user error, but in the context of logging, it's quite hard to avoid. For example, when interacting with sync library code it's quite common to use asyncio.to_thread to prevent blocking calls. If the library you're calling in a thread then logs, you'll get an error because the thread spawned to do the work doesn't have an event loop.

This has came up in several different ways for me. First when using ddtrace (a Datadog client library) and then later in my own library code.

Code to reproduce bug

import asyncio
import logging

import zmq
from zmq.asyncio import Context, Socket
from zmq.log.handlers import PUBHandler

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

def make_sockets(ctx: Context) -> tuple[Socket, Socket]:
    addr = 'tcp://127.0.0.1'

    first = ctx.socket(zmq.PAIR)
    first.linger = 0
    port = first.bind_to_random_port(addr)

    second = ctx.socket(zmq.PAIR)
    second.linger = 0
    second.connect(f'{addr}:{port}')

    return first, second

async def print_messages(sub: Socket, stop: asyncio.Event):
    while True:
        msg_task = sub.recv_multipart()
        stop_task = asyncio.create_task(stop.wait())
        done, _ = await asyncio.wait({msg_task, stop_task}, return_when=asyncio.FIRST_COMPLETED)
        if stop_task in done:
            break
        print(await msg_task)

async def main():
    with Context() as ctx:
        pub, sub = make_sockets(ctx)

        # backgound task to read sent message
        stop_printing = asyncio.Event()
        print_task = asyncio.create_task(print_messages(sub, stop_printing))

        # create some logs
        logger.addHandler(PUBHandler(pub))
        logger.info("hello")  # log outside thread
        await asyncio.to_thread(logger.info, "world")  # log in thread

        # wait for messages to tbe sent
        await asyncio.sleep(1)

        # stop the background task
        stop_printing.set()
        await print_task

if __name__ == "__main__":
    asyncio.run(main())

Traceback, if applicable

Traceback (most recent call last):
  File "main.py", line 58, in <module>
    asyncio.run(main())
  File "...venv/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "...venv/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "...venv.py", line 47, in main
    await asyncio.to_thread(logger.info, "world")  # log in thread
  File "...venv/lib/python3.10/asyncio/threads.py", line 25, in to_thread
    return await loop.run_in_executor(None, func_call)
  File "...venv/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "...venv/lib/python3.10/logging/__init__.py", line 1477, in info
    self._log(INFO, msg, args, **kwargs)
  File "...venv/lib/python3.10/logging/__init__.py", line 1624, in _log
    self.handle(record)
  File "...venv/lib/python3.10/logging/__init__.py", line 1634, in handle
    self.callHandlers(record)
  File "...venv/lib/python3.10/logging/__init__.py", line 1696, in callHandlers
    hdlr.handle(record)
  File "...venv/lib/python3.10/logging/__init__.py", line 968, in handle
    self.emit(record)
  File "...venv/lib/python3.10/site-packages/zmq/log/handlers.py", line 186, in emit
    self.socket.send_multipart([btopic, bmsg])
  File "...venv/lib/python3.10/site-packages/zmq/_future.py", line 321, in send_multipart
    return self._add_send_event('send_multipart', msg=msg_parts, kwargs=kwargs)
  File "...venv/lib/python3.10/site-packages/zmq/_future.py", line 509, in _add_send_event
    f = future or self._Future()
  File "...venv/lib/python3.10/asyncio/events.py", line 656, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'asyncio_0'.

More info

I'm currently using the following as a workaround:

import asyncio
import logging
from typing import Any

from zmq.log.handlers import PUBHandler as _PUBHandler

class PUBHandler(_PUBHandler):

    def __init__(self, *args: Any, loop: asyncio.AbstractEventLoop = None, **kwargs: Any) -> None:
        self.event_loop = loop or _try_get_running_loop()
        super().__init__(*args, **kwargs)

    def emit(self, record: logging.LogRecord) -> None:
        if self.event_loop is None:
            super().emit(record)
        else:
            self.event_loop.call_soon_threadsafe(super().emit, record)

def _try_get_running_loop() -> asyncio.AbstractEventLoop | None:
    try:
        return asyncio.get_running_loop()
    except RuntimeError:
        return None
minrk commented 3 months ago

I don't think PUBHandler should accept async sockets, so the fix is probably to cast async sockets to sync sockets if they are given. The workaround is to do it yourself before passing to PUBHandler, I think.

rmorshea commented 3 months ago

Why shouldn't the PUBHandler accept an async socket? It seems like it would be valuable to avoid blocking calls to the socket whenever you hit log statements.

minrk commented 3 months ago

Why shouldn't the PUBHandler accept an async socket?

For one, PUB sockets effectively never block anyway. If they are backed up, they drop messages instead of blocking. Plus, in general, zmq sockets (at the libzmq level) are not threadsafe, so I think it is not actually safe to use PUBHandler with any logger that may be used concurrently from multiple threads with any kind of socket. In that way, I suppose using call_soon_threadsafe might actually be safer than a sync socket. But then you'd need to be sure you are connecting to the right event loop, and I don't think PUBHandler has enough information to do that (e.g. consecutive calls to asyncio.run which are in one thread, but two event loops).

Cases to consider:

A separate AsyncPUBHandler that has declared semantics (i.e. attaches to the running loop at start and requires that the loop is running) could make sense, though, but I think it would cause problems in the first two cases to adopt that behavior in the base PUBHandler.

rmorshea commented 3 months ago

I think a separate AsyncPUBHandler makes a lot of sense.

One way to address the first concern might be to copy asyncio.mixins._LoopBoundMixin which implements the following in order to lazily determine the loop to use:

class _LoopBoundMixin:
    _loop = None

    def _get_loop(self):
        loop = events._get_running_loop()

        if self._loop is None:
            with _global_lock:
                if self._loop is None:
                    self._loop = loop
        if loop is not self._loop:
            raise RuntimeError(f'{self!r} is bound to a different event loop')
        return loop

However, I could see this being a bit challenging to reason about since it wouldn't be immediately obvious what loop the handler would be bound to. The only way to find out would be to read the code to determine what loop is active when the first log is produced.

As a compromise, perhaps a loop argument to AsyncPUBHandler could accept:

minrk commented 3 months ago

That sounds reasonable. Want to have a go? I'm hoping to release pyzmq 26 soon, and imagine 26.1 will likely come pretty soon after with fixes for issues that get reported with the new build system.

rmorshea commented 3 months ago

Great. Will try and take a crack at it when I find time.