zeromq / jeromq

JeroMQ is a pure Java implementation of the ZeroMQ messaging library, offering high-performance asynchronous messaging for distributed or concurrent applications.
https://zeromq.org
Mozilla Public License 2.0
2.36k stars 484 forks source link

xSub socket subscription throws IllegalArgumentException #916

Open komape opened 2 years ago

komape commented 2 years ago

My system has a broker with two sockets, one xPub and one xSub. When I try to subscribe the xSub socket to all topics I get an IllegalArgumentException with the error message Unknown Option 6. It looks like that the subscription command is internally mapped to the number 6. But there is no switch case option that handles the number 6.

ZMQ.Socket xSubSocket = context.createSocket(SocketType.XSUB);
xSubSocket.bind(X_SUB_ADDRESS);
xSubSocket.subscribe(""); // subscribe to all
Exception in thread "main" java.lang.IllegalArgumentException: Unknown Option 6
    at zmq.Options.setSocketOpt(Options.java:547)
    at zmq.SocketBase.setSocketOpt(SocketBase.java:234)
    at org.zeromq.ZMQ$Socket.setSocketOpt(ZMQ.java:910)
    at org.zeromq.ZMQ$Socket.subscribe(ZMQ.java:2113)

What's the recommended way to handle this?

Muspah commented 2 years ago

Bit late, but calling socket#subscribe() on an XSUB socket doesn't work indeed. We've fixed it like:

for (String sub : subscriptions) {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    bos.writeBytes(new byte[]{0x01});
    bos.writeBytes(sub.getBytes(StandardCharsets.UTF_8));
    socket.send(bos.toByteArray());
}

Btw, we're using socket#connect() instead of socket#bind() on an XSUB, but I'm not sure about the reasoning behind that anymore.