dabeaz / curio

Good Curio!
Other
4.03k stars 241 forks source link

curio_zmq example fails using zmq.PUB/zmq.SUB #335

Closed carlodri closed 3 years ago

carlodri commented 3 years ago

Hi and thank you for all the great care you have taken in developing curio!

I am trying to use the curio_zmq example with a publisher/subscriber socket type, as in:

publisher:

import curio_zmq as zmq

async def publisher(address):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PUB)
    sock.bind(address)
    for n in range(100):
        await sock.send(b"Message %d" % n)
    await sock.send(b"exit")

if __name__ == "__main__":
    zmq.run(publisher, "tcp://*:9000")

subscriber:

import curio_zmq as zmq

async def subscriber(address):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.SUB)
    sock.connect(address)
    while True:
        msg = await sock.recv()
        if msg == b"exit":
            break
        print("Got:", msg)

if __name__ == "__main__":
    zmq.run(subscriber, "tcp://127.0.0.1:9000")

While with the PUSH/PULL sockets everything works, with this PUB/SUB example, the publisher seems to send stuff, but the subscriber just sits there and receives nothing. Do you have any hint on what could be going wrong here? Thanks!

dabeaz commented 3 years ago

It's been awhile since I've used ZMQ, but if my memory serves, using PUB/SUB requires an extra step of invoking some kind of ZMQ socket option to subscribe to a given "feed". It's also possible (if your example), that the publisher simply runs extremely fast and all of the messages are lost before the subscriber gets a chance to connect. What happens if you stick a delay in the loop that produces messages?

carlodri commented 3 years ago

Thanks @dabeaz! Yes, your memory serves you well! In fact, by looking at the zmq examples you do need a socket.setsockopt(zmq.SUBSCRIBE, topicfilter)...

Works perfectly now, thanks and sorry for the noise! If you are interested, I could open a PR with a working zmq PUB/SUB example, although it is quite trivial (if I had read the documentation more carefully 🤦‍♂️ 😜).