erickt / rust-zmq

Rust zeromq bindings.
Apache License 2.0
900 stars 194 forks source link

PUB-SUB lost data even though sync with REQ-REP #192

Closed ghost closed 5 years ago

ghost commented 7 years ago

I impl syncpub and syncsub (zguide's example), the c code works ok, but the rust's failed, syncsub cann't recv all the pub's data.

rust impl

syncpub

extern crate zmq;

const SUBSCRIBERS_EXPECTED: i32 = 10;

fn main() {
    let context = zmq::Context::new();

    let publisher = context.socket(zmq::PUB).unwrap();
    assert!(publisher.bind("tcp://*:5561").is_ok());

    publisher.set_sndhwm(1100000).unwrap();

    let syncservice = context.socket(zmq::REP).unwrap();
    assert!(syncservice.bind("tcp://*:5562").is_ok());

    println!("Waiting for subsribers");
    for _ in 0..SUBSCRIBERS_EXPECTED {
        let _ = syncservice.recv_string(0).unwrap().unwrap();
        syncservice.send_str("", 0).unwrap();
    }

    println!("Broadcasting messages");
    for _ in 0..1000000 {
        publisher.send_str("Rhubarb", 0).unwrap();
    }

    publisher.send_str("END", 0).unwrap();
}

syncsub

extern crate zmq;

use std::{thread, time};

fn main() {
    let context = zmq::Context::new();

    let subscriber = context.socket(zmq::SUB).unwrap();
    assert!(subscriber.connect("tcp://localhost:5561").is_ok());
    subscriber.set_subscribe(b"").unwrap();

    thread::sleep(time::Duration::from_secs(1));

    let syncclient = context.socket(zmq::REQ).unwrap();
    assert!(syncclient.connect("tcp://localhost:5562").is_ok());

    syncclient.send_str("", 0).unwrap();

    let _ = syncclient.recv_string(0).unwrap().unwrap();

    let mut update_nbr = 0;
    loop {
        let string = subscriber.recv_string(0).unwrap().unwrap();
        if string == "END" {
            break;
        }
        update_nbr += 1;
    }
    println!("Received {} update_nbr", update_nbr);
}

c impl

syncpub

//  Synchronized publisher

#include "zhelpers.h"
#define SUBSCRIBERS_EXPECTED  10  //  We wait for 10 subscribers

int main (void)
{
    void *context = zmq_ctx_new ();

    //  Socket to talk to clients
    void *publisher = zmq_socket (context, ZMQ_PUB);

    int sndhwm = 1100000;
    zmq_setsockopt (publisher, ZMQ_SNDHWM, &sndhwm, sizeof (int));

    zmq_bind (publisher, "tcp://*:5561");

    //  Socket to receive signals
    void *syncservice = zmq_socket (context, ZMQ_REP);
    zmq_bind (syncservice, "tcp://*:5562");

    //  Get synchronization from subscribers
    printf ("Waiting for subscribers\n");
    int subscribers = 0;
    while (subscribers < SUBSCRIBERS_EXPECTED) {
        //  - wait for synchronization request
        char *string = s_recv (syncservice);
        free (string);
        //  - send synchronization reply
        s_send (syncservice, "");
        subscribers++;
    }
    //  Now broadcast exactly 1M updates followed by END
    printf ("Broadcasting messages\n");
    int update_nbr;
    for (update_nbr = 0; update_nbr < 1000000; update_nbr++)
        s_send (publisher, "Rhubarb");

    s_send (publisher, "END");

    zmq_close (publisher);
    zmq_close (syncservice);
    zmq_ctx_destroy (context);
    return 0;
}

syncsub

//  Synchronized subscriber

#include "zhelpers.h"
#include <unistd.h>

int main (void)
{
    void *context = zmq_ctx_new ();

    //  First, connect our subscriber socket
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "tcp://localhost:5561");
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);

    //  0MQ is so fast, we need to wait a while...
    sleep (1);

    //  Second, synchronize with publisher
    void *syncclient = zmq_socket (context, ZMQ_REQ);
    zmq_connect (syncclient, "tcp://localhost:5562");

    //  - send a synchronization request
    s_send (syncclient, "");

    //  - wait for synchronization reply
    char *string = s_recv (syncclient);
    free (string);

    //  Third, get our updates and report how many we got
    int update_nbr = 0;
    while (1) {
        char *string = s_recv (subscriber);
        if (strcmp (string, "END") == 0) {
            free (string);
            break;
        }
        free (string);
        update_nbr++;
    }
    printf ("Received %d updates\n", update_nbr);

    zmq_close (subscriber);
    zmq_close (syncclient);
    zmq_ctx_destroy (context);
    return 0;
}
jkarneges commented 5 years ago

I think the subscribers need to set rcvhwm otherwise messages may be dropped.

rotty commented 5 years ago

Since @jkarneges suggestion indicates a plausible cause, there has been no answer to that, and I don't see how this can be a bug in the bindings, I'll close this now. Feel free to re-open with a justification if you still think this is a bug.