erickt / rust-zmq

Rust zeromq bindings.
Apache License 2.0
899 stars 194 forks source link

Issues with the ZMQ_CONFLATE setting #296

Closed vegapit closed 4 years ago

vegapit commented 4 years ago

Hello,

I am running into issues when using a Subscriber with the Conflate setting. Here is below the code for my Subscriber struct.

use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Result;

pub struct Subscriber {
    socket: zmq::Socket
}

impl Subscriber {

    pub fn new(zmqurl: &str, conflate: bool) -> Subscriber {
        let ctx = zmq::Context::new();
        let socket = ctx.socket(zmq::SUB).unwrap();
        socket.set_conflate( conflate ).expect("Conflate setting failed");
        socket.connect( zmqurl ).expect("ZMQ Stream connection failed");

        Subscriber{
            socket: socket
        }
    }

    /// Manage market subscriptions
    pub fn subscribe(&mut self, channel: &str) {
        self.socket.set_subscribe( channel.as_bytes() ).expect("Subscription Failed");
    }

    /// Manage market subscriptions
    pub fn unsubscribe(&mut self, channel: &str) {
        self.socket.set_unsubscribe( channel.as_bytes() ).expect("Unsubscription Failed");
    }

    /// Wait for next message
    pub fn listen<T: DeserializeOwned>(&mut self) -> Result<T> {
        let data = self.socket.recv_string(0).expect("data extraction failed").unwrap();
        serde_json::from_str( &data )
    }

}

It works like a charm when Conflate is not turned on, but as I enable it, I am getting the following error after a few messages:

Assertion failed: !_more (/Users/vegapit/.cargo/registry/src/github.com-1ecc6299db9ec823/zeromq-src-0.1.8+4.3.2/vendor/src/fq.cpp:112)
Abort trap: 6

Any ideas on what the problem could be?

rotty commented 4 years ago

Having a look at libzmq source, it seems this assert in fq.cpp is triggered:

//  Check the atomicity of the message.
//  If we've already received the first part of the message
//  we should get the remaining parts without blocking.
zmq_assert (!_more);

Are you using multipart messages in combination with ZMQ_CONFLATE? If so, this is noted as not supported in zmq_setsockopt(3):

Does not support multi-part messages, in particular, only one part of it is kept in the socket internal queue.

However, I would have not guessed that "not supported" means "crashing on an assertion" :man_shrugging:.

I've prepared a small test case which shows that ZMQ_CONFLATE on the receiver side appears to work, at least when used with single-part messages on a PUSH/PULL socket pair. I'll reference this issue when I send out the PR for that test, so it should show up in this thread soonish™.

vegapit commented 4 years ago

I will need to read the doc a little longer next time =:]