zeromq / pyzmq

PyZMQ: Python bindings for zeromq
http://zguide.zeromq.org/py:all
BSD 3-Clause "New" or "Revised" License
3.65k stars 637 forks source link

BUG: zmq hangs forever if attempting to send a message before socket has time to connect #1911

Closed edmundsj closed 9 months ago

edmundsj commented 9 months ago

What pyzmq version?

25.1.1

What libzmq version?

4.3.4

Python version (and how it was installed)

3.9.6

OS

Mac OS

What happened?

This code works fine:

import time
import zmq

def main():
    context = zmq.Context()
    sender = context.socket(zmq.PUB)
    sender.connect("tcp://localhost:6560")

    receiver = context.socket(zmq.SUB)
    receiver.connect("tcp://localhost:6559")
    receiver.setsockopt_string(zmq.SUBSCRIBE, "")
    time.sleep(0.5)
    while(True):
        sender.send_string("hello world")
        string = receiver.recv_string()
        print(string)
        time.sleep(0.5)

if __name__ == "__main__":
    main()

And this code hangs forever:

import time
import zmq

def main():
    context = zmq.Context()
    sender = context.socket(zmq.PUB)
    sender.connect("tcp://localhost:6560")

    receiver = context.socket(zmq.SUB)
    receiver.connect("tcp://localhost:6559")
    receiver.setsockopt_string(zmq.SUBSCRIBE, "")
    # time.sleep(0.5) # This is now commented out.
    while(True):
        sender.send_string("hello world")
        string = receiver.recv_string()
        print(string)
        time.sleep(0.5)

if __name__ == "__main__":
    main()

Why? I have a hard time believing that hanging indefinitely after attempting to send a message before a socket is connected was a deliberate design choice. It's an incredibly difficult to debug silent failure.

Code to reproduce bug

`remote.py` (run in a separate window, creates a ZMQ device)

import logging
import zmq

logging.basicConfig(level=logging.INFO)

def expose_local_pub_sub(local_pub: int, local_sub: int):
    try:
        context = zmq.Context()
        frontend = context.socket(zmq.SUB)
        frontend.bind(f"tcp://*:{local_sub}")
        frontend.setsockopt_string(zmq.SUBSCRIBE, "")

        backend = context.socket(zmq.PUB)
        backend.bind(f"tcp://*:{local_pub}")
        logging.info(f"Bound local PUB at port {local_pub} to SUB porrt at {local_sub}")
        zmq.device(zmq.FORWARDER, frontend, backend)
    except Exception as e:
        print(e)

    finally:
        frontend.close()
        backend.close()
        context.term()
if __name__ == "__main__":
    expose_local_pub_sub(local_sub=6560, local_pub=6559)


### Traceback, if applicable

_No response_

### More info

_No response_
minrk commented 9 months ago

Please try to report issues and questions about zeromq socket behavior to libzmq, not pyzmq, which doesn't influence things like socket behavior.

But the example code is hanging in receiver.recv_string because there are no messages to receive. PUB sockets discard messages if there are no registered subscribers, and subscriptions take a finite amount of time to propagate from sub->pub over the network. So messages sent after the subscription is established are delivered, but not before.

Sequence of events with the sleep:

without sleep:

If you want to reach into subscription events, you can use socket monitoring or use the XPUB/XSUB sockets, which let you receive subscription messages and act on that information. If you don't want messages to be discarded, you may want to choose a reliable message delivery pattern instead of PUB/SUB, where discarding messages with no recipient is part of the design.