zeromq / pyzmq

PyZMQ: Python bindings for zeromq
http://zguide.zeromq.org/py:all
BSD 3-Clause "New" or "Revised" License
3.71k stars 635 forks source link

BUG: pypy pyzmq randomly fails in asserts #2043

Open Sec42 opened 4 days ago

Sec42 commented 4 days ago

This is a pyzmq bug

What pyzmq version?

26.2.0

What libzmq version?

4.3.5

Python version (and how it was installed)

Python 3.8.13 (7.3.9+dfsg-1ubuntu0.1, Nov 15 2022, 06:22:50) [PyPy 7.3.9 with GCC 11.3.0] on linux

OS

Ubuntu 22.04.5 LTS

What happened?

my script using zmq randomly dies in different asserts after a few (~1-8) hours

Example asserts:

Assertion failed: false (src/object.cpp:142)
Assertion failed: ok (src/mailbox.cpp:72)

Code to reproduce bug

    import zmq

    url = "tcp://127.0.0.1:4223"

    context = zmq.Context()
    socket = context.socket(zmq.XPUB)
    socket.setsockopt(zmq.XPUB_VERBOSE, True)
    socket.bind(url)

    def zmq_thread(socket):
        try:
            while True:
                event = socket.recv()
                 # Event is one byte 0=unsub or 1=sub, followed by topic
                if event[0] == 1:
                    log("new subscriber for", event[1:])
                elif event[0] == 0:
                    log("unsubscribed",event[1:])

        except zmq.error.ContextTerminated:
            pass

    from threading import Thread
    zthread = Thread(target = zmq_thread, args = [socket], daemon= True, name='zmq')
    zthread.start()

    # real code of course does some more work locally and sends changing messages.
    while True:
        socket.send_string("FOO BAR")

Traceback, if applicable

No response

More info

Last time I checked, it did not fail if run without pypy

minrk commented 4 days ago

libzmq sockets are not thread safe. You must not use a socket from multiple threads at once unless you are careful to use a lock around all socket methods.

Sec42 commented 4 days ago

I'm not sure I understand how you are supposed to receive the (asynchronous) notification about clients connecting/disconnecting with pyzmq then? Can you give an example on how to do it correctly?

Thanks

minrk commented 4 days ago

you can use pollers or an event loop such as asyncio. Here's an asyncio version of your script that adds a subscriber so you can see the subscription messages are coming in while sending is happening:

import asyncio
from threading import Thread
import time

import zmq.asyncio

# for now
log = print

async def log_subscribers(socket):
    try:
        while True:
            event = await socket.recv()
             # Event is one byte 0=unsub or 1=sub, followed by topic
            if event[0] == 1:
                log("new subscriber for", event[1:])
            elif event[0] == 0:
                log("unsubscribed",event[1:])

    except zmq.ContextTerminated:
        pass

def subscriber_main(url):
    with zmq.Context() as ctx:
        for i in range(10):
            with ctx.socket(zmq.SUB) as sub:
                sub.connect(url)
                topic = f"topic{i}"
                sub.subscribe(topic)
                time.sleep(0.1)
                sub.unsubscribe(topic)
                time.sleep(0.1)

async def main():
    url = "tcp://127.0.0.1:4223"

    context = zmq.asyncio.Context()
    socket = context.socket(zmq.XPUB)
    socket.setsockopt(zmq.XPUB_VERBOSE, True)
    socket.bind(url)
    # spawn 'thread' but it's a coroutine in the same thread
    asyncio.create_task(log_subscribers(socket))

    # spawn subscribers in an actual thread, so this is a complete demo
    subscriber_thread = Thread(target=subscriber_main, args=(url,), daemon=True)
    subscriber_thread.start()

    # real code of course does some more work locally and sends changing messages.
    while subscriber_thread.is_alive():
        log("sending")
        await socket.send_string("FOO BAR")
        await asyncio.sleep(0.1)

if __name__ == "__main__":
    asyncio.run(main())
Sec42 commented 3 days ago

Thank you very much for your example. As far as I understand it, this will in effect check/poll the socket after each line is sent, right? I will do some tests based on your example.

minrk commented 3 days ago

Approximately, yeah. It's not quite polling, but everywhere you see an await is an opportunity for another coroutine to take over and advance to its next await if whatever it was waiting for has become ready. This is sometimes called "cooperative multitasking" because if you write code that has no awaits no other tasks will be run, unlike threads which can be fully concurrent except when locks (such as the GIL) are held.

It is event-driven, so the log_subscribers coroutine will sleep until an event arrives indicating that the socket has something to receive. Similarly, if send is unavailable, the send_string will sleep until send can proceed.