zeromq / jeromq

Pure Java ZeroMQ
Mozilla Public License 2.0
2.36k stars 483 forks source link

Unable to receive messages when binding subscriber socket #289

Open jusw85 opened 9 years ago

jusw85 commented 9 years ago

I'm trying to bind a subscriber socket, and running the following test case. However, the code blocks on recvStr():

    ZContext ctx = new ZContext(1);

    ZMQ.Socket rcvr = ctx.createSocket(ZMQ.SUB);
    rcvr.bind("inproc://control");
    rcvr.subscribe("".getBytes());

    ZMQ.Socket sndr = ctx.createSocket(ZMQ.PUB);
    sndr.connect("inproc://control");

    Thread.sleep(500);
    sndr.send("msg");
    String msg = rcvr.recvStr(); // blocks
    System.out.println(msg);

    ctx.destroySocket(rcvr);
    ctx.destroySocket(sndr);
    ctx.close();

If I bind the publisher socket, I dont have this issue:

    ZContext ctx = new ZContext(1);

    ZMQ.Socket sndr = ctx.createSocket(ZMQ.PUB);
    sndr.bind("inproc://control");

    ZMQ.Socket rcvr = ctx.createSocket(ZMQ.SUB);
    rcvr.connect("inproc://control");
    rcvr.subscribe("".getBytes());

    Thread.sleep(500);
    sndr.send("msg");
    String msg = rcvr.recvStr();
    System.out.println(msg); // prints "msg"

    ctx.destroySocket(rcvr);
    ctx.destroySocket(sndr);
    ctx.close();

Is there any way for me to bind the subscriber socket?

miniway commented 9 years ago

Do you have any special use case to bind on the SUB socket? It doesn't seems to be very common. PUB manages subscribers when SUB connects to it. From the PUB perspective, there's no subscribers to send message as no one connected to PUB, so it discards messages.

jusw85 commented 9 years ago

My understanding of bind is that it's used on stable parts of the messaging architecture. In my use case, I have one static subscriber that listens to a fixed endpoint, and a dynamic (potentially growing) number of publishers. I use the PUB SUB pattern as the subscriber needs to pickup all messages sent by any publisher.

From http://zeromq.org/area:faq When should I use bind and when connect?

As a very general advice: use bind on the most stable points in your architecture and connect from the more volatile endpoints.

miniway commented 9 years ago

That's generally right, but PUB binds and SUB connects to the PUB. Otherwise messages might not be delivered. As http://zguide.zeromq.org/page:all#Messaging-Patterns comments shortly, some socket types works only within a limitation.

In your case, you might need a secondary bind with another socket type like DEALER. When a dynamic PUB appears, it connects to the secondary first with another socket and sends its endpoint, then main SUB connects to the PUB with the endpoint. There're similar patterns at the user guide which exchanges control messages.

jusw85 commented 9 years ago

Thanks, guess I'll find a workaround with an intermediary instead. Thanks!

sjohnr commented 8 years ago

Should be:

    ZMQ.Socket rcvr = ctx.createSocket(ZMQ.SUB);
    rcvr.subscribe("".getBytes());
    rcvr.bind("inproc://control");

right? Your subscribe is in the wrong order, I think.

utgarda commented 8 years ago

JZMQ has absolutely identical API, but it allows binding a SUB socket and receive messages from connected publishing sockets, as soon as I swapped to JZMQ, this worked:

$ sbt console
[info] Starting scala interpreter...

scala> import org.zeromq.ZMQ
import org.zeromq.ZMQ

scala> import org.zeromq.ZMQ._
import org.zeromq.ZMQ._

scala> val ctx = ZMQ.context(1)
ctx: org.zeromq.ZMQ.Context = org.zeromq.ZMQ$Context@4e588bb3

scala> val sub = ctx.socket(ZMQ.SUB)
sub: org.zeromq.ZMQ.Socket = org.zeromq.ZMQ$Socket@34f5e96c

scala> sub.bind("tcp://127.0.0.1:5555")

scala> val pub = ctx.socket(ZMQ.PUB)
pub: org.zeromq.ZMQ.Socket = org.zeromq.ZMQ$Socket@22b66894

scala> pub.connect("tcp://127.0.0.1:5555")

scala> sub.subscribe(Array[Byte]())

scala> pub.send("abc")
res3: Boolean = true

scala> sub.recv()
res4: Array[Byte] = Array(97, 98, 99)

Means this behaviour isn't the design of ZMQ. Nontheless, JNI bindings have their own problems, JZMQ doesn't even build on my system, I have to build my project on Ubuntu. So I'd be happy to see this fixed in jeromq.

daveyarwood commented 7 years ago

So it seems like the big question here is, should it be possible to bind a SUB socket and connect publishers in JeroMQ like it is in JZMQ?

The ZeroMQ guide seems to suggest that the way to do this is with a secondary bind as described by @miniway .

megfigura commented 6 years ago

Just for reference, I ran into this issue in the project I'm working on. In this project, consumers (SUB) already advertise themselves in ZooKeeper for other reasons, so it'd be really convenient for it to also publish its ZMQ endpt there as well. The producer then finds consumers and creates the necessary PUB sockets to distribute its events. The subscription here would be used to make sure that messages are sharded consistently to the correct consumer, based on an ID within the data that matches the subscription.

daveyarwood commented 5 years ago

I'm still not convinced that we need to support binding the SUB and connecting the PUB, but I suppose it would be worth investigating why it doesn't work with JeroMQ and seeing if there is a way we can make it work. Especially given that it reportedly works with jzmq.

ledergec commented 3 years ago

If ever this gets fixed I would be interested to learn. From the age of the issue I conclude it's not a priority.

sjohnr commented 3 years ago

@ledergec, I'm fairly certain this already works in JeroMQ as long as you subscribe and then bind, as I use this technique all the time for distributed logging and eventing.

I have a library using this technique, though it's written in Kotlin and using jzmq-api on top of jeromq, but you should get the idea:

ledergec commented 3 years ago

I have tried the following code:

` ZContext context = new ZContext();

var subscriber = context.createSocket(SocketType.SUB);
subscriber.subscribe("");
subscriber.bind("tcp://127.0.0.1:9877");
subscriber.setLinger(0);

Thread.sleep(100);

var publisher = context.createSocket(SocketType.PUB);
publisher.connect("tcp://127.0.0.1:9877");
publisher.setLinger(0);

for (int i = 0; i < 10; ++i) {
  Thread.sleep(100);
  publisher.send(new byte[] {(byte) 8, (byte) 2});
}

var result = subscriber.recv();
assertThat(result).isEqualTo(new byte[] {(byte) 8, (byte) 2});

publisher.close();
subscriber.close();
context.close();

` If you see anything which needs to be corrected, please let me know!