zeromq / jeromq

Pure Java ZeroMQ
Mozilla Public License 2.0
2.34k stars 484 forks source link

Router hangs in send() after client disconnects / doesn't return EHOSTUNREACH #989

Open pmconrad opened 1 month ago

pmconrad commented 1 month ago

Not sure how much this is related to other issues that have reported hangs in send(), like #34 #725 .

Minimal example

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

public class Test {
    public static final String ADDRESS = "tcp://localhost:18989";
    private final Object lock = new Object();
    private volatile int state = 0;

    public static void main(String[] args) {
        new Test().testHang();
    }

    public void testHang() {
        try (var zmqContext = new ZContext()) {
            var thread = new Thread(() -> {
                    try (var socket = zmqContext.createSocket(SocketType.ROUTER)) {
                        socket.setRouterMandatory(true);
                        socket.bind(ADDRESS);
                        step();
                        var addr = socket.recv();
                        var body = socket.recv();
                        step();
                        wait(3);
                        socket.send(addr, ZMQ.SNDMORE); // <--- broker thread hangs here
                        socket.send(body);
                        step();
                    } catch (RuntimeException e) {
                        e.printStackTrace();
                    }
                }, "broker");
            thread.start();
            try (var socket = zmqContext.createSocket(SocketType.DEALER)) {
                wait(1);
                socket.connect(ADDRESS);
                socket.send(new byte[0]);
                wait(2);
            }
            try {
                Thread.sleep(20); // allow some time for the socket actually disconnecting
            } catch (InterruptedException ignore) {}
            step();
            wait(4);  // <--- main thread hangs here
            while (thread.isAlive()) {
                try {
                    thread.join();
                } catch (InterruptedException ignore) {}
            }
        }
    }

    private void step() {
        synchronized (lock) {
            ++state;
            lock.notifyAll();
        }
    }

    private void wait(int wantedState) {
        synchronized (lock) {
            while (state < wantedState) {
                try {
                    lock.wait();
                } catch (InterruptedException ignore) {}
            }
            if (state != wantedState) { throw new IllegalStateException("Want " + state + " but have " + wantedState); }
        }
    }
}

Tested with 0.5.4, 0.6.0 and latest snapshot. Hangs every time. Relevant thread stacks:

"broker" #14 prio=5 os_prio=0 cpu=62,52ms elapsed=13,50s tid=0x00007f827c1ad5b0 nid=0x178905 runnable  [0x00007f82501f6000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPoll.wait(java.base@17.0.11/Native Method)
        at sun.nio.ch.EPollSelectorImpl.doSelect(java.base@17.0.11/EPollSelectorImpl.java:118)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(java.base@17.0.11/SelectorImpl.java:129)
        - locked <0x000000062ac26aa0> (a sun.nio.ch.Util$2)
        - locked <0x000000062ac26938> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(java.base@17.0.11/SelectorImpl.java:141)
        at zmq.Signaler.waitEvent(Signaler.java:166)
        at zmq.Mailbox.recv(Mailbox.java:51)
        at zmq.SocketBase.processCommands(SocketBase.java:1136)
        at zmq.SocketBase.send(SocketBase.java:843)
        at zmq.SocketBase.send(SocketBase.java:781)
        at org.zeromq.ZMQ$Socket.send(ZMQ.java:3445)
        at Test.lambda$testHang$0(Test.java:25)
        at Test$$Lambda$10/0x00007f81f400c428.run(Unknown Source)
        at java.lang.Thread.run(java.base@17.0.11/Thread.java:840)
"main" #1 prio=5 os_prio=0 cpu=55,38ms elapsed=13,55s tid=0x00007f827c013d60 nid=0x1788f3 in Object.wait()  [0x00007f82809fe000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(java.base@17.0.11/Native Method)
        - waiting on <0x000000062b032900> (a java.lang.Object)
        at java.lang.Object.wait(java.base@17.0.11/Object.java:338)
        at Test.wait(Test.java:63)
        - locked <0x000000062b032900> (a java.lang.Object)
        at Test.testHang(Test.java:43)
        at Test.main(Test.java:11)

Expected behaviour

send() should return false and set EHOSTUNREACH.

Arguably, as per documentation of setRouterMandatory the send call could return only after sendTimeOut has passed. This is unset in the example code above. If I do set sendTimeOut, then the send() call does return false, but with EAGAIN not EHOSTUNREACH.

Related question

If the ROUTER socket has accepted the first packet in a series of send(.., SNDMORE), will it accept all following send(..., SNDMORE) and the final send(...) without hanging / waiting for the timeout? (If the router stops accepting packets in the middle of a SNDMORE sequence I cannot send messages to a different recipient through the same socket. That would be bad.)