erickt / rust-zmq

Rust zeromq bindings.
Apache License 2.0
900 stars 194 forks source link

EAGAIN / socket::send #240

Closed fabienjuif closed 5 years ago

fabienjuif commented 5 years ago

Hi 👋!

As you can see here, I tried to write a custom broker:

The JS version hangs for some reason, I wanted to try with Rust because I don't know if this is the JS binding or a poor code design from my side.

But I rely on socket::send to returns an error if the message can't be send (it helps me considerer a worker is dead, and try to send the task to an other). At this point, socket::send doesn't seems to send an error if message can't be delivered (I tried with and without flag zmq::DONTWAIT.

My socket is in ROUTER mode (SocketType::ROUTER). With this code, if I kill the "worker" process, there is no error:

socket.send(&worker_name, zmq::SNDMORE & zmq::DONTWAIT).unwrap();
socket.send("", zmq::SNDMORE & zmq::DONTWAIT).unwrap(); // TODO: this could be removed!
socket.send(&task.payload, zmq::DONTWAIT).unwrap();

Same for this code:

socket.send(&worker_name, zmq::SNDMORE).unwrap();
socket.send("", zmq::SNDMORE).unwrap(); // TODO: this could be removed!
socket.send(&task.payload, 0).unwrap();

Is it something I don't understand or this is a known lack from rust-zmq?

Versions:

Thank you!

kysely commented 5 years ago

Hey @fabienjuif I believe you have to use bitwise OR (|) to combine the flags, not AND. It works perfectly for me.

fabienjuif commented 5 years ago

Hey @kysely !

Thank you for your help, so you are right about |, I am dumb ^^' But I did try to change that, and the result is the same

            socket.send(&worker_name, zmq::SNDMORE | zmq::DONTWAIT).expect("send should work 1");
            socket.send("", zmq::SNDMORE | zmq::DONTWAIT).expect("send should work 2"); // TODO: this could be removed!
            socket.send(&task.payload, zmq::DONTWAIT).expect("send should work 3");

I never goes in my expects sadly 😢

https://github.com/fabienjuif/zeromq-rs-tryout/blob/master/src/main.rs#L149

fabienjuif commented 5 years ago

I did a little video to show you the issue I have: https://i.imgur.com/63ncAbx.mp4

  1. I run the "broker" to the far left
  2. I run a worker
  3. I disconnect the worker
    • You can see that the "broker" sees a worker, and don't disconnect it right away,
    • This is attended, the worker should be disconnected when we try to send a message, and have an error
  4. I run the client
  5. We see that the "broker" sends the messages to the worker (that is not there anymore), without prompting "send should work"
    • It should print "send should work"
kysely commented 5 years ago

Hmm, I overlooked this at first, but DONTWAIT flag is not compatible with ROUTER sockets

When ROUTER can't deliver the message (either unknown identity or full send buffer), it simply drops the message. It never blocks (thus you can't use the DONTWAIT flag).

You can check against the official zmq_socket docs, “action in mute state” section.

kysely commented 5 years ago

I also made a toy example just to make sure.

I intentionally set the high-water mark for send buffer to 1 message so that we hit the mute state when trying to send 2 messages.

You'll see the ROUTER successfully finishes because when it cannot buffer the first message for sending out (due to unknown identity), it simply drops the message.

On the other hand, DEALER successfully saves the first message into the send buffer, however it cannot do the same with the second message (due to high-water mark). Because we use DONTWAIT flag, it expectedly returns an error (if we didn't use DONTWAIT, it would simply block at send call).

use zmq;

fn router_example(ctx: &zmq::Context) {
    let sock = ctx.socket(zmq::ROUTER).unwrap();
    sock.bind("tcp://*:5555").unwrap();

    // set send buffer to 1 message so that we enter “mute state”
    // when trying to send out more than 1 msg
    sock.set_sndhwm(1).unwrap();

    println!("\nRunning ROUTER example");
    let mut i = 0;
    while i < 2 {
        println!("Sending out msg #{:?}...", i);
        sock.send("IDENTITY", zmq::DONTWAIT | zmq::SNDMORE).unwrap();
        sock.send("HEADER", zmq::DONTWAIT | zmq::SNDMORE).unwrap();
        sock.send("PAYLOAD", zmq::DONTWAIT).unwrap();
        println!("Message #{:?} sent", i);
        i += 1;
    }
}

fn dealer_example(ctx: &zmq::Context) {
    let sock = ctx.socket(zmq::DEALER).unwrap();
    sock.connect("tcp://localhost:5555").unwrap();

    // set send buffer to 1 message so that we enter “mute state”
    // when trying to send out more than 1 msg
    sock.set_sndhwm(1).unwrap();

    println!("\nRunning DEALER example");
    let mut i = 0;
    while i < 2 {
        println!("Sending out msg #{:?}...", i);
        sock.send("HEADER", zmq::DONTWAIT | zmq::SNDMORE).unwrap();
        sock.send("PAYLOAD", zmq::DONTWAIT).unwrap();
        println!("Message #{:?} buffered", i);
        i += 1;
    }
}

fn main() {
    let ctx = zmq::Context::new();
    router_example(&ctx);
    dealer_example(&ctx);
}
fabienjuif commented 5 years ago

Hm ok, surprisingly it works the way I described it with the js binding 🤔 : if send doesn't find the worker it drops an error.

I will re-read your response and read the pointer on doc you give me.

Did you get the concept of what I tried to achieve? If so, do you think this is dumb or should I use a different socket type?

Thank you again!

kysely commented 5 years ago

I quickly looked at your JS load balancer and there is one important difference.

You enabled ZMQ_ROUTER_MANDATORY option for your ROUTER socket (only in JS version, not in Rust). As mentioned in the socket docs I linked earlier above, this works because:

When a ZMQ_ROUTER socket enters the mute state due to having reached the high water mark for all peers, then any messages sent to the socket shall be dropped until the mute state ends. Likewise, any messages routed to a peer for which the individual high water mark has been reached shall also be dropped, unless ZMQ_ROUTER_MANDATORY socket option is set.

By default, ROUTER drops message if it cannot resolve the host or the send buffer is full. However with ZMQ_ROUTER_MANDATORY, you enforce the address to be send-able, otherwise raise an error.

You can do the same in Rust via

socket.set_router_mandatory(true).unwrap();
fabienjuif commented 5 years ago

Yo very nice I forgot about it thank you!!

kysely commented 5 years ago

I think we've proven this is not a rust-zmq problem and the issue can be closed.

fabienjuif commented 5 years ago

Sure, I wanted to test it first. I do it right now and close the issue if this is OK here 👌

fabienjuif commented 5 years ago
thread 'main' panicked at 'send should work 1: Host unreachable', src/libcore/result.rs:1009:5

Nice !

Thank you @kysely 😄