zeromq / jeromq

JeroMQ is a pure Java implementation of the ZeroMQ messaging library, offering high-performance asynchronous messaging for distributed or concurrent applications.
https://zeromq.org
Mozilla Public License 2.0
2.36k stars 483 forks source link

High water mark option not working for pub sub #588

Open sherlockedjd opened 6 years ago

sherlockedjd commented 6 years ago

hi all can someone please explain with an example how to make high water mark option work for pub type sockets. I have tried but the publisher seems to publish all the messages even when HWM value is reached.can someone provide a complete example where messages are dropped when HWM is reached on pub socket. Thanks in advance. also note i have been setting this option before binding to my pub socket.

fredoboulo commented 6 years ago

Hi, did you have a look on the test PubSubHwmTest ?

It seems that PUB sockets seems to be not lossy, with option

ZMQ.setSocketOption(pub, ZMQ.ZMQ_XPUB_NODROP, true);

when sending messages without waiting, like:

ZMQ.send(pub, msg, ZMQ.ZMQ_DONTWAIT);

That would allow you to detect when HWM is reached.

If you could describe more precisely the usage you want of this, we may be of better help.

sherlockedjd commented 6 years ago

Thanks fred for your response i shall definitely try this out. Basically i just want to explore the possibility of detecting when messages are being dropped on the publisher side because of high water mark being reached i.e. when subscriber is slow it starts dropping messages and consequently on the other hand the publisher side queue also gets full and eventually publisher starts to drop messages as mentioned in zmq docs for pub type sockets.

sherlockedjd commented 6 years ago

I tried sending messages with dontwait flag but it appears not be working i always get the eagian flag set to 35 after sending any number of messages.also the hwm seems not to be working on pub side

heres my pub side code if any one wants to look at this and help me out

public static void main(String[] args) throws InterruptedException { // Prepare our context and publisher ZMQ.Context context = ZMQ.context(1); ZMQ.Socket publisher = context.socket(ZMQ.PUB); boolean receipt; int count=0; int error=0; publisher.setSndHWM(50); publisher.bind("tcp://localhost:5559"); String message="this is message from server1"; Scanner sc=new Scanner(System.in); String input="Yes"; Socket socket =new Socket(); System.out.println(ZMQ.getVersionString()); long starttime=System.currentTimeMillis(); for(int i=0;i<50;i++) //int i=0; //while(true) { message="this is message from server1"; message+="#"+i;

    //receipt=publisher.send(message);
    receipt=publisher.send(message,ZMQ.DONTWAIT);
    if(receipt)
    {
        count++;
    }
    if(publisher.errno()==ZError.EAGAIN)
    {
        System.out.println(publisher.errno());
        error++;

    }

    if(i==0||i==5000||i==10000||i==49000)
    {
     long timeinmillisecs=System.currentTimeMillis();
     long millis = timeinmillisecs % 1000;
     long second = (timeinmillisecs / 1000) % 60;
     long minute = (timeinmillisecs / (1000 * 60)) % 60;
     long hour = (timeinmillisecs/ (1000 * 60 * 60)) % 24+5;
     hour=hour%12;

        String time = String.format("%02d:%02d:%02d.%d", hour, minute, second, millis);
        System.out.println("in sleep baby at time"+time);
       Thread.sleep(4000);
    input=sc.next();
    if(input=="Stop")
    {
        break;
    }
  }
 // i++;
    System.out.println(message);
    }
    long stoptime=System.currentTimeMillis();
    long timetaken=stoptime-starttime;
    System.out.println("time elapsed in sending "+count+" msgs: "+timetaken+"failed"+error);
    //publisher.close ();
    //context.term ();
}
fredoboulo commented 6 years ago

You did not use Socket.setXpubNoDrop, in order to disable lossy behaviour.

As a general rule, errno should be checked only in case of error. That's the current behaviour that could certainly be improved by resetting the value before any API call, but checking it only in case of not being able to send message would help your code.

daveyarwood commented 5 years ago

@sherlockedjd Does the previous post help?

petaryulianov commented 1 year ago

@daveyarwood No it doesn't seem to work for the TCP protocol even with socket.setXpubNoDrop(true); HWM is not respected. May be the problem is because the TCP socket also has a buffer, and the messages are buffered there not in the jeroMQ queues.

AdamWardVGP commented 7 months ago

I'm also trying to reproduce HWM on an XPUB socket and wondering if there's a solution. socket.setXpubNoDrop doesn't seem to have the documented effect.

  val serverThread = thread {
    val zContext = ZContext()

    val serverSocket = zContext.createSocket(SocketType.XPUB)
    serverSocket.setXpubNoDrop(true)
    serverSocket.hwm = 1

    val bindResult = serverSocket.bind("tcp://*:9000")

    logger.v(LOG_TAG, "Server bind :$bindResult")

    var totalSent = 0

    while (System.currentTimeMillis() < endTime) {
      try {
        val sr1 = serverSocket.sendMore(topic)
        val sr2 = serverSocket.send(msgBody)
        if(!sr1 || !sr2) {
          logger.v(LOG_TAG, "Server send is too fast, HWM overflow. terminating server. ${serverSocket.errno()}")
          break
        }
        totalSent += 1
      } catch(ex: Exception) {
        logger.v(LOG_TAG, "got exception $ex")
      }
    }

    logger.v(LOG_TAG, "Total sent is $totalSent")
    zContext.destroy()
  }

If I let this run for some time I never see my log statements hit that I would expect to see false from my send call and EAGAIN from errno.