erickt / rust-zmq

Rust zeromq bindings.
Apache License 2.0
886 stars 189 forks source link

impl Sendable for &mut Message? #322

Open nicholasfagan opened 3 years ago

nicholasfagan commented 3 years ago

I've been trying to send and receive multipart messages one part at at time, and re-use the same Message for each part, with the goal of reducing allocations. However, the Message is moved when I call send, and so I have to Message::new() for each part of the multipart message.

In C, I would simply re-use the zmq_msg_t* as it is not consumed by zmq_msg_send, the internal message pointer is just moved to the socket's queue and the messages internals are reset.

From the zeromq api documentation:

The zmq_msg_t structure passed to zmq_msg_send() is nullified during the call. If you want to send the same message to multiple sockets you have to copy it (e.g. using zmq_msg_copy()). A successful invocation of zmq_msg_send() does not indicate that the message has been transmitted to the network, only that it has been queued on the socket and ØMQ has assumed responsibility for the message. You do not need to call zmq_msg_close() after a successful zmq_msg_send().

My interpretation of this is that zmq_msg_t is still perfectly good and safe to re-use after a call to zmq_msg_send.

If my interpretation is correct, then we could do this:

impl<'a> Sendable for &'a mut Message {
    fn send(self, socket: &Socket, flags: i32) -> Result<()> {
        zmq_try!(unsafe { zmq_sys::zmq_msg_send(msg_ptr(self), socket.sock, flags as c_int });
        Ok(())
    }
}

which would let me do this:

let recv_sock: Socket = ...;
let send_sock: Socket = ...;
// Alloc a message once at the start
let mut msg = Message::new();
loop {
    recv_sock.recv(&mut msg, 0)?;
    /// process msg ...
    if msg.get_more() {
        send_sock.send(&mut msg, zmq::SNDMORE)?;
    } else {
        send_sock.send(&mut msg, 0)?;
        break;
    }
}

instead of this:

let recv_sock: Socket = ...;
let send_sock: Socket = ...;
loop {
   // Alloc a message for every part of the multipart message
    let mut msg = Message::new();
    recv_sock.recv(&mut msg, 0)?;
    /// process msg ...
    if msg.get_more() {
        send_sock.send(msg, zmq::SNDMORE)?;
    } else {
        send_sock.send(msg, 0)?;
        break;
    }
}