jean-airoldie / libzmq-rs

A strict subset of ØMQ with an ergonomic API.
https://jean-airoldie.github.io/libzmq-rs/
Apache License 2.0
58 stars 4 forks source link

libzmq does not ensure all messages are sent before socket is dropped #129

Open awaited-hare opened 4 years ago

awaited-hare commented 4 years ago

Describe the bug A clear and concise description of what the bug is.

libzmq does not ensure all messages are sent before socket is dropped.

To Reproduce Give a minimal reproduction example of the bug, if applicable.

The following code hangs:

    let input_addr: TcpAddr = "127.0.0.1:9999".try_into().expect("cannot parse zmq addr");
    let mut socket_builder = libzmq::GatherBuilder::new();
    let socket = socket_builder.bind(&[input_addr.clone()]).recv_hwm(5).build().expect("cannot \
        build zmq socket");
    {
        let mut socket_builder = libzmq::ScatterBuilder::new();
        let socket = socket_builder.connect(&[input_addr]).send_hwm(5).build().expect("cannot\
             build zmq socket");
        socket.send("test message").expect("zmq failed to send message");
        log::debug!("message sent"); // actually not sent (bug)
    }
    let message = socket.recv_msg().expect("zmq failed to recv bytes");
    log::debug!("cannot reach here because the message cannot be received!");
    log::debug!("zmq recv message {:?}", message);

Expected behavior A clear and concise description of what you expected to happen.

The messages should be sent before the socket is dropped.

Platform:

jean-airoldie commented 4 years ago

Currently the linger period is disabled for all socket using the ZMQ_BLOCKY socket option. I swear I documented the default socket options that I used somewhere but I can't find it in the doc.

https://github.com/jean-airoldie/libzmq-rs/blob/c2ea5bcaac736e0e16c38e25bb07dbbdf2ce7723/libzmq/src/ctx.rs#L365

If you have a valid use case, we could expose this API publicly.

However I don't think that your example use case is a valid use because of the nature of the gather and scatter transport. ZMQ provides no synchronization mechanism for PUB & SUB nor GATHER & SCATTER because "the data stream is [considered] infinite and has no start and no end and therefore cannot be used for reliable messaging. That's why messages are dropped on a slow subscriber (albeit that behavior can be configured).

The way the ZMQ guide suggest to do the sync is to either sleep a given amount of time (which is a terrible idea) or use another reliable socket to do the sync (which i think is equally terrible). I think the ideal solution would be for the Scatter socket to be notified when a Gather socket subscribes, but thats out of the scope of ZMQ I'm afraid.

jean-airoldie commented 4 years ago

I'll add some clarification on the default behavior in the libzmq::prelude::Socket doc.