zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.72k stars 2.36k forks source link

high water marks not unbounded for PUB/SUB #4395

Open stvales opened 2 years ago

stvales commented 2 years ago

Issue description

When setting HWM to 0 in PUB/SUB communications, limits seem to still apply and cause messages to be dropped.

Environment

Minimal test code / Steps to reproduce the issue

A sender and a receiver using both pub/sub and push/pull as a comparison.

Sender:

#include <czmq.h>

int size = 1024;
int number = 0;
char zmq_endpoint[1024] = "";
zsock_t *pub = NULL;
zsock_t *push = NULL;

int send_messages (zloop_t *loop, int timer_id, void *arg){
    char *type = (char *)arg;
    if (streq(type, "stop"))
        return -1;
    for (int i = 1; i <= number; i++){
        printf("sending with %s %d/%d\n", type, i, number);
        if (streq(type, "pub"))
            zsock_send(pub, "i", i);
        else
            zsock_send(push, "i", i);
    }
    return 0;
}

int main(int argc, const char * argv[]) {
    number = atoi(argv[3]);

    pub = zsock_new_pub(argv[1]);
    zsock_set_sndhwm(pub, 0);
    zsock_set_rcvhwm(pub, 0);

    push = zsock_new_push(argv[2]);
    zsock_set_sndhwm(push, 0);
    zsock_set_rcvhwm(push, 0);

    zloop_t *loop = zloop_new();
    zloop_timer(loop, 2000, 1, send_messages, "push");
    zloop_timer(loop, 4000, 1, send_messages, "pub");
    zloop_timer(loop, 6000, 1, send_messages, "stop");
    zloop_start(loop);
    zloop_destroy(&loop);
    zsock_destroy (&pub);
    zsock_destroy (&push);
    return 0;
}

Receiver:

#include <czmq.h>

int size = 1024;
int number = 0;
int sub_msg_nb = 0;
int pull_msg_nb = 0;
char zmq_endpoint[1024] = "";
zsock_t *sub = NULL;
zsock_t *pull = NULL;

int zmq_reader (zloop_t *loop, zsock_t *reader, void *arg){
    char *type = (char *)arg;
    int value = 0;
    zsock_recv(reader, "i", &value);
    int *counter = NULL;
    if (streq(type, "sub"))
           counter = &sub_msg_nb;
       else
           counter = &pull_msg_nb;
    ++(*counter);
    //printf("received %s value %d as #%d\n", type, value, *counter);
    if (value != *counter){
        printf("lost message - #%d value is %d (%d dropped messages)\n", *counter, value, value - *counter);
        *counter = value; //catchup
    }
    return 0;
}

int main(int argc, const char * argv[]) {
    number = atoi(argv[3]);

    sub = zsock_new_sub(argv[1], "");
    zsock_set_sndhwm(sub, 0);
    zsock_set_rcvhwm(sub, 0);

    pull = zsock_new_pull(argv[2]);
    zsock_set_sndhwm(pull, 0);
    zsock_set_rcvhwm(pull, 0);

    zloop_t *loop = zloop_new();
    zloop_reader(loop, sub, zmq_reader, "sub");
    zloop_reader(loop, pull, zmq_reader, "pull");
    zloop_reader_set_tolerant(loop, sub);
    zloop_reader_set_tolerant(loop, pull);
    zloop_start(loop);
    zloop_destroy(&loop);
    zsock_destroy (&sub);
    zsock_destroy (&pull);
    return 0;
}

Used on tcp and ipc transports this way (tested on same and two computers for TCP):

./receiver tcp://192.168.1.196:12345 tcp://192.168.1.196:12346 10000
./sender tcp://192.168.1.196:12345 tcp://192.168.1.196:12346 10000

./receiver ipc:///tmp/pub ipc:///tmp/push 10000 
./sender ipc:///tmp/pub ipc:///tmp/push 10000 

What's the actual result?

In all cases, push/pull communications are complete (all 10 000 messages received) and pub/sub communications lose some messages (it is not a constant number).

lost message - #3501 value is 3721 (220 dropped messages)
lost message - #8721 value is 8758 (37 dropped messages)

NB: this is NOT a late joiner problem because dropped messages are in the middle of the sequence, not in the beginning. We ensured this by running the receiver before the sender and waiting 2 seconds before sending messages.

NB: of course the problem appears only when sent messages are more than 1000, which is the default HWM value...

What's the expected result?

When using _zsock_setsndhwm and _zsock_setrcvhwm on all socket to set HWM to zero, one would expect all the messages to be delivered in PUB/SUB communications, just like it is done in PUSH/PULL.

kstauffer commented 1 year ago

Duplicate of #4394