zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.67k stars 2.35k forks source link

Fix the bug that the router may block forever when the sending queue is full #4665

Open ZhuXxxx opened 6 months ago

ZhuXxxx commented 6 months ago

If we create a router, and ZMQ_ROUTER_MANDATORY is set to 1, ZMQ_SNDTIMEO is set to -1(default value is -1, infinite),when a peer SNDHWM is reached, we send a message to this peer, at this time the peer disconnected, We will observe that this situation will cause the function to be blocked forever.

We can find this bug through the following code:

static void test_dealer(void* const context)
{   
    void* const dealer = zmq_socket(context, ZMQ_DEALER);
    //
    int rc = zmq_setsockopt(dealer, ZMQ_ROUTING_ID, "X", 1);
    assert(0 == rc);
    rc = zmq_connect(dealer, "inproc://a");
    assert(0 == rc);
    //
    ::std::this_thread::sleep_for(::std::chrono::seconds(5));
    zmq_close(dealer);
    //
    printf("close dealer\n");
}
int main(void)
{
    void* const context = zmq_ctx_new();
    //  
    void* const router = zmq_socket(context, ZMQ_ROUTER);
    //  
    const int on = 1;
    int rc = zmq_setsockopt(router, ZMQ_ROUTER_MANDATORY, &on, sizeof(on));
    assert(0 == rc);
    //  
    rc = zmq_bind(router, "inproc://a");
    assert(0 == rc);
    //  
    ::std::thread thread0(test_dealer, context);
    thread0.detach();
    ::std::this_thread::sleep_for(::std::chrono::seconds(1));
    //  
    while (1) {
        rc = zmq_send(router, "X", 1, ZMQ_SNDMORE);
        if (1 != rc) {
            break;
        }
        rc = zmq_send(router, "hello", 5, 0);
        if (5 != rc) {
            break;
        }
    }
    return 0;
}