erickt / rust-zmq

Rust zeromq bindings.
Apache License 2.0
886 stars 189 forks source link

Many dropped messages with push/pull sockets via IPC #327

Closed detly closed 3 years ago

detly commented 3 years ago

Using:

I have found that a small example program with a pull socket only receives maybe 1 out of every 50 messages I send via a push socket, using IPC.

In one terminal, I'll compile the C example below and run:

while true; do build/conf_test; done

Hold down ctrl+c to kill it after you're done.

In another terminal, I'll do cargo run and see:

Waiting
Got: "Hello,"
Waiting
Got: "world!"
Waiting
^C

That first loop over the C program can easily run a couple of hundred times, and I might only see a couple of messages printed by the Rust code.

The Rust code:

use zmq;

fn main() {
    let zctx = zmq::Context::new();
    if let Ok(zsock) = zctx.socket(zmq::PULL) {
        zsock.connect("ipc://@flub").unwrap();
        loop {
            println!("Waiting");
            let msg = zsock.recv_msg(0).unwrap();
            println!("Got: {:?}", msg.as_str().unwrap());
        }
    }
    else {
        println!("Could not open ZeroMQ socket");
    }
}

The C (CZMQ) code:

#include <stdio.h>
#include <czmq.h>

int main(void)
{
    printf("O: Creating sockets\n");
    zsock_t * to_conf = zsock_new_push("@ipc://@flub");

    if (!to_conf) {
        fprintf(stderr, "E: Could not open socket\n");
        return 1;
    }

    zmsg_t * conf_data = zmsg_new();
    const int data1_result = zmsg_addstr(conf_data, "Hello,");
    const int data2_result = zmsg_addstr(conf_data, "world!");

    if (data1_result || data2_result)
    {
        fprintf(stderr, "E: Could not encode message\n");
        zsock_destroy(&to_conf);
        return 1;
    }

    printf("O: Sending message\n");
    const int result = zmsg_send(&conf_data, to_conf);

    if (result)
    {
        fprintf(stderr, "E: Error sending message: %d\n", result);
    }

    zsock_destroy(&to_conf);
    return 0;
}

I have tried it with IPC sockets, both "abstract" sockets (ie. not appearing in the filesystem, see the @ after ipc://) and non-abstract, as well as TCP. It goes the same way.

Am I using the library wrong here? I don't have another system to try it on to see if it's an OS thing, and I'm relatively new to Rust so maybe I've made a mistake, but the fact that it works sometimes has me confused.

detly commented 3 years ago

Please excuse me, this is totally unrelated to your library and 100% due to my misuse of CZMQ. My sending code (the push socket) was closing the socket too quickly. The default "linger" for ZeroMQ sockets is -1 for the low level API, but 0 for CZMQ. Setting it to -1 in my C code makes it all work fine.