zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.8k stars 2.36k forks source link

[bug] subscribe channel does not receive message, even after a barrier among publisher and subscriber #4713

Open youkaichao opened 4 months ago

youkaichao commented 4 months ago

Please use this template for reporting suspected bugs or requests for help.

Issue description

Hi, team, thanks for the great project! I'm using zmq for broadcasting messages, in the vLLM project. And I encountered some bugs, that I think might be related with zmq.

Environment

Minimal test code / Steps to reproduce the issue

# test.py

from zmq import PUB, REP, REQ, SUB, SUBSCRIBE, Context, LAST_ENDPOINT  # type: ignore

import torch

torch.distributed.init_process_group(backend="gloo")

rank = torch.distributed.get_rank()

world_size = torch.distributed.get_world_size()

context = Context()

if rank == 0:

    local_socket = context.socket(PUB)
    # bind to a random port
    local_socket.bind("tcp://*:*")
    local_socket_port = local_socket.getsockopt(LAST_ENDPOINT).decode("utf-8").split(":")[-1]

    local_sync_socket = context.socket(REP)
    local_sync_socket.bind("tcp://*:*")
    local_sync_socket_port = local_sync_socket.getsockopt(LAST_ENDPOINT).decode("utf-8").split(":")[-1]

    torch.distributed.broadcast_object_list([local_socket_port, local_sync_socket_port], src=0)

    # local readers
    for i in range(world_size - 1):
        recv = local_sync_socket.recv()
        assert recv == b"READY"
        local_sync_socket.send(b"READY")

    local_socket.send(b"READY")

    local_socket.send(b"data")

else:
    data = [None, None]
    torch.distributed.broadcast_object_list(data, src=0)
    local_socket_port, local_sync_socket_port = data

    local_socket = context.socket(SUB)
    local_socket.connect(f"tcp://localhost:{local_socket_port}")
    local_socket.setsockopt_string(SUBSCRIBE, "")

    local_sync_socket = context.socket(REQ)
    local_sync_socket.connect(f"tcp://localhost:{local_sync_socket_port}")

    local_sync_socket.send(b"READY")
    assert local_sync_socket.recv() == b"READY"

    assert local_socket.recv() == b"READY"
    assert local_socket.recv() == b"data"

run the code for about 100 times:

success_count=0
for ((i=1; i<=100; i++)); do
  torchrun --nproc-per-node=8 test.py && ((success_count++)) && echo "Success count: $success_count"
done

About once in 20~50 runs, it will hang. The reason is, even if I put a barrier for the publisher and all subscriber, some subscribers still don't get the message. Therefore, they are waiting forever at assert local_socket.recv() == b"READY"

I'm following https://zguide.zeromq.org/docs/chapter2/#Handling-Multiple-Sockets , to add a synchronization point before I publish anything. It works for most of the time. But sometimes it will fail, i.e. publish message before all subscriber are ready to subscribe the message.

I find adding a time.sleep(1) before local_socket.send(b"READY") helps, but that's not an elegant solution.

Are there any methods to check, i.e. if the publisher gets enough subscribers, or the subscriber can check if it is connected to the publisher and is ready to receive the message?

Thanks for the great project, and look forward to the solution!

jamesdillonharvey commented 4 months ago

"Are there any methods to check, i.e. if the publisher gets enough subscribers, or the subscriber can check if it is connected to the publisher and is ready to receive the message?"

You can use XPUB rather than PUB on your local_socket, then poll the local_socket for incoming subscription messages. Each recv on local_socket will contain a subscription message that starts with 1 followed by the subscription string. You can then count the number of subscribers connected and send once you reach the required number.

youkaichao commented 4 months ago

Thanks for your quick response!

using XPUB seems to work. Is the subscription message guarenteed to be b'\x01'? This is what I get from executing the code, but I can't find documentation about it.

youkaichao commented 4 months ago

Code using XPUB (so that I don't need an additional sync socket any more):

from zmq import PUB, REP, REQ, SUB, SUBSCRIBE, Context, LAST_ENDPOINT, XPUB, XPUB_VERBOSE  # type: ignore

import torch

torch.distributed.init_process_group(backend="gloo")

rank = torch.distributed.get_rank()

world_size = torch.distributed.get_world_size()

context = Context()

if rank == 0:

    local_socket = context.socket(XPUB)
    local_socket.setsockopt(XPUB_VERBOSE, True)
    # bind to a random port
    local_socket.bind("tcp://*:*")
    local_socket_port = local_socket.getsockopt(LAST_ENDPOINT).decode("utf-8").split(":")[-1]

    torch.distributed.broadcast_object_list([local_socket_port], src=0)

    # local readers
    for i in range(world_size - 1):
        local_socket.recv()

    local_socket.send(b"READY")

    local_socket.send(b"data")

else:
    data = [None]
    torch.distributed.broadcast_object_list(data, src=0)
    local_socket_port, = data

    local_socket = context.socket(SUB)
    local_socket.connect(f"tcp://localhost:{local_socket_port}")
    local_socket.setsockopt_string(SUBSCRIBE, "")

    assert local_socket.recv() == b"READY"
    assert local_socket.recv() == b"data"
youkaichao commented 4 months ago

Just to confirm, @jamesdillonharvey is this issue a bug? I mean, when I use local sync sockets to make sure all subscribers joined, some subscribers are still not ready to receive the message.