heydavid525 / hotbox

BSD 3-Clause "New" or "Revised" License
3 stars 0 forks source link

ROUTER socket would be blocked while sending msg to a known client who suddenly crashes. #1

Open holyglenn opened 9 years ago

holyglenn commented 9 years ago

Bug Identified: zmq_util wouldn't throw error when should return EAGAIN. Code has been pushed to the repository.

aurickq commented 9 years ago

I'm not sure if I'm understanding this bug correctly.

  1. ROUTER 1 connects to ROUTER 2
  2. ROUTER 2 crashes or exits without closing
  3. ROUTER 1 send message to ROUTER 2

Are you saying that in the above situation, ROUTER 1 will block indefinitely on step 3?

holyglenn commented 9 years ago

@aurickq Thanks for the reply!

The procedure is:

  1. ROUTER 1 binds and ROUTER 2 connects.
  2. ROUTER 2 sends something and ROUTER 1 is replying.
  3. ROUTER 2 crashes or exits without closing
  4. ROUTER 1 will a) block indefinitely (ZMQ_ROUTER_MANDATORY == 1). b) return with 0 without throwing (C++) an exception/error (ZMQ_ROUTER_MANDATORY == 1 && setting ZMQ_SNDTIMEO to a small value say 300 ms) c) return successfully and silently discard the msgs (ZMQ_ROUTER_MANDATORY == 0)

What happened: for a), It is expected to return EHOSTUNREACH in such case. ROUTER actually would return EHOSTUNREACH if it hasn't seen (known the address of) the connected client before. But for such sudden crash, it blocks. for b), so the natural thing to do is setting a send timeout. But a timeout exception would be EAGAIN. In zmq.cpp the c++ binding, the devs actually define the behavior for EAGAIN to be returning 0. Here is the code:

 inline size_t send (const void *buf_, size_t len_, int flags_ = 0)
 {
     int nbytes = zmq_send (ptr, buf_, len_, flags_);
     if (nbytes >= 0)
         return (size_t) nbytes;
     if (zmq_errno () == EAGAIN) // Here is the devil.
         return 0;
     throw error_t ();
 }

In summary, when a client suddenly crashes you would in the best case get a return 0 from a ROUTER socket. for c), this is the normal behavior for ROUTER socket, before ZMQ_ROUTER_MANDATORY happened.

holyglenn commented 9 years ago

@aurickq To be More exact: the client didn't exactly crash, it normally exits while server is still sending something to it. I also tried to re-start the client with the same id, it would still be blocked.

The devs talked about another case of block sending, which is when the client id is not known before. http://lists.zeromq.org/pipermail/zeromq-dev/2012-October/018945.html It is fixed and hence EHOSTUNREACH error.

The code for socket of send for all kinds of sockets in ZMQ is:

    //  Oops, we couldn't send the message. Wait for the next
    //  command, process it and try to send the message again.
    //  If timeout is reached in the meantime, return EAGAIN.
    while (true) {
        if (unlikely (process_commands (timeout, false) != 0)) {
            EXIT_MUTEX();
            return -1;
        }        
        rc = xsend (msg_);
        if (rc == 0)
            break;
        if (unlikely (errno != EAGAIN)) {
            EXIT_MUTEX();
            return -1;
        }
        if (timeout > 0) {
            timeout = (int) (end - clock.now_ms ());
            if (timeout <= 0) {
                errno = EAGAIN;
                EXIT_MUTEX();
                return -1;
            }
        }
    }

Since timeout is effective, we can infer that the ROUTER socket's zmq_send function is blocked in the infinite loop. Neither process_commands nor xsend would be a blocking behavior.

process_commands is a non-block operation if timeout is 0. The commands are for intra zmq operations. It's implemented using poll() system call. I will look into the function xsend to see why an error hasn't been thrown.

aurickq commented 9 years ago

Thanks! This issue is very important for our training system as well.

However, it seems like I can't reproduce the problem. I am using ZeroMQ 4.1.2. Is this only a problem on earlier versions?

aurickq commented 9 years ago

Here is the code I'm using:

Server:

#include <unistd.h>
#include <zmq.h>
#include <cstdio>

int main() {
  int major, minor, patch;
  zmq_version(&major, &minor, &patch);
  printf("ZeroMQ %d.%d.%d\n", major, minor, patch);
  void *ctx = zmq_ctx_new();
  void *sock = zmq_socket(ctx, ZMQ_ROUTER);
  zmq_setsockopt(sock, ZMQ_IDENTITY, "server", 7);
  zmq_bind(sock, "tcp://127.0.0.1:10000");
  char addr[256];
  char buff[256];
  while (true) {
    int len = zmq_recv(sock, addr, 256, 0);
    zmq_recv(sock, buff, 256, 0);
    printf("recv %s\n", buff);
    zmq_send(sock, addr, len, ZMQ_SNDMORE);
    sleep(1);
    zmq_send(sock, "response", 9, 0);
    printf("send response\n");
  }
  zmq_close(sock);
  zmq_term(ctx);
}

Client:

#include <unistd.h>
#include <zmq.h>
#include <cstdio>

int main() {
  int major, minor, patch;
  zmq_version(&major, &minor, &patch);
  printf("ZeroMQ %d.%d.%d\n", major, minor, patch);
  void *ctx = zmq_ctx_new();
  void *sock = zmq_socket(ctx, ZMQ_ROUTER);
  int mandatory = 1;
  zmq_setsockopt(sock, ZMQ_ROUTER_MANDATORY,
      &mandatory, sizeof(int));
  zmq_connect(sock, "tcp://127.0.0.1:10000");
  char addr[256];
  char buff[256];
  sleep(1);
  while(true) {
    zmq_send(sock, "server", 7, ZMQ_SNDMORE);
    zmq_send(sock, "message", 10, 0);
    printf("send message\n");
    zmq_recv(sock, addr, 256, 0);
    zmq_recv(sock, buff, 256, 0);
    printf("recv %s\n", buff);
  }
  zmq_close(sock);
  zmq_term(ctx);
}

I am starting the server, then starting the client. Then I kill the client when the server is blocked on the sleep(1) statement. When the server unblocks, the client is gone, but the zmq_send call on the next line completes successfully without blocking.

holyglenn commented 9 years ago

Hi @aurickq , either of the following mod to your code could reproduce the problem by killing the client during the sleep pause, which means that either change the server or the client to the following could do it. But ZMQ_ROUTER_MANDATORY has to be set in both cases. btw I am using 4.0.5 of ZMQ.

The server program:

#include <unistd.h>
#include <zmq.h>
#include <cstdio>

int main() {
  int major, minor, patch;
  zmq_version(&major, &minor, &patch);
  printf("ZeroMQ %d.%d.%d\n", major, minor, patch);
  void *ctx = zmq_ctx_new();
  void *sock = zmq_socket(ctx, ZMQ_ROUTER);
  zmq_setsockopt(sock, ZMQ_IDENTITY, "server", 7);
// The problem happens with this option. 
// Otherwise everything is just discarded with a router socket, with no warning ever.
  int mandatory = 1;
  zmq_setsockopt(sock, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof(mandatory));
  zmq_bind(sock, "tcp://127.0.0.1:10000");
  char addr[256];
  char buff[256];
  while (true) {
    int len = zmq_recv(sock, addr, 256, 0);
    zmq_recv(sock, buff, 256, 0);
    printf("recv %s\n", buff);
    sleep(4);   // Where you sleep a little longer to ensure a clean kill, and put the send together.
    zmq_send(sock, addr, len, ZMQ_SNDMORE);
    zmq_send(sock, "response", 9, 0);
    printf("send response\n");
  }
  zmq_close(sock);
  zmq_term(ctx);
}

The client program: originally I was trying to figure out what the API document meant by putting " The zmq_send() function will clear all pending events on a socket.". So I wrote an async client.

int main() {
  int major, minor, patch;
  zmq_version(&major, &minor, &patch);
  printf("ZeroMQ %d.%d.%d\n", major, minor, patch);
  void *ctx = zmq_ctx_new();
  void *sock = zmq_socket(ctx, ZMQ_ROUTER);
  int mandatory = 1;
  zmq_setsockopt(sock, ZMQ_ROUTER_MANDATORY,
      &mandatory, sizeof(int));
  zmq_connect(sock, "tcp://127.0.0.1:10000");
  char addr[256];
  char buff[256];
  sleep(1);
  for (int i=0; i < 10; i++) {
    zmq_send(sock, "server", 7, ZMQ_SNDMORE);
    zmq_send(sock, "message", 10, 0);
    printf("send message\n");
  }
  for (int i=0; i < 10; i++) {
    zmq_recv(sock, addr, 256, 0);
    zmq_recv(sock, buff, 256, 0);
    printf("recv %s\n", buff);
  }
  zmq_close(sock);
  zmq_term(ctx);
}
aurickq commented 9 years ago

Thanks, this is very helpful! I still cannot reproduce it using version 4.1.2, though. Perhaps it's worth double-checking and switch to using 4.1.2 if this is really the case.

Either way, this shouldn't be a problem for our training system since we do not need to set ROUTER_MANDATORY on the server.

holyglenn commented 9 years ago

Yeah you are right. The ROUTER default behavior would suffice if nothing is to be done about the discarded msg. The ZMQ_ROUTER_MANDATORY is for debug purpose though. I have used latest 4.2.0 and the problem still exists. I will provide a workable pair.

#include <unistd.h>
#include <zmq.h>
#include <cstdio>

int main() {
  int major, minor, patch;
  zmq_version(&major, &minor, &patch);
  printf("ZeroMQ %d.%d.%d\n", major, minor, patch);
  void *ctx = zmq_ctx_new();
  void *sock = zmq_socket(ctx, ZMQ_ROUTER);
  int mandatory = 1;
  zmq_setsockopt(sock, ZMQ_ROUTER_MANDATORY,
      &mandatory, sizeof(int));
  zmq_connect(sock, "tcp://127.0.0.1:10000");
  char addr[256];
  char buff[256];
  sleep(1);
  for (int i=0; i < 10; i++) {
    zmq_send(sock, "server", 7, ZMQ_SNDMORE);
    zmq_send(sock, "message", 10, 0);
    printf("send message\n");
    zmq_recv(sock, addr, 256, 0);
    zmq_recv(sock, buff, 256, 0);
    printf("recv %s\n", buff);
  }
  zmq_close(sock);
  zmq_term(ctx);
}
#include <unistd.h>
#include <zmq.h>
#include <cstdio>

int main() {
  int major, minor, patch;
  zmq_version(&major, &minor, &patch);
  printf("ZeroMQ %d.%d.%d\n", major, minor, patch);
  void *ctx = zmq_ctx_new();
  void *sock = zmq_socket(ctx, ZMQ_ROUTER);
  zmq_setsockopt(sock, ZMQ_IDENTITY, "server", 7);
  int mandatory = 1;
  zmq_setsockopt(sock, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof(mandatory));
  zmq_bind(sock, "tcp://127.0.0.1:10000");
  char addr[256];
  char buff[256];
  while (true) {
    int len = zmq_recv(sock, addr, 256, 0);
    zmq_recv(sock, buff, 256, 0);
    printf("recv %s\n", buff);
    sleep(4);
    zmq_send(sock, addr, len, ZMQ_SNDMORE);
    zmq_send(sock, "response", 9, 0);
    printf("send response\n");
  }
  zmq_close(sock);
  zmq_term(ctx);
}

The procedure is to kill the client when waiting. After reading the code I think the latest version would be supposed to be stuck there at sending, unless a timeout is set.

heydavid525 commented 9 years ago

Nice discussions! One request: Weiren could you put the code in repo under experiment/zmq ? That way I can see the code more easily. Thanks!Thanks for the

On Thu, Aug 20, 2015 at 6:31 AM YuWeiren notifications@github.com wrote:

Yeah you are right. The ROUTER default behavior would suffice if nothing is to be done about the discarded msg. The ZMQ_ROUTER_MANDATORY is for debug purpose though. I have used latest 4.2.0 and the problem still exists. I will provide a workable pair.

···C

include

include

include

int main() { int major, minor, patch; zmq_version(&major, &minor, &patch); printf("ZeroMQ %d.%d.%d\n", major, minor, patch); void ctx = zmq_ctx_new(); void sock = zmq_socket(ctx, ZMQ_ROUTER); int mandatory = 1; zmq_setsockopt(sock, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof(int)); zmq_connect(sock, "tcp://127.0.0.1:10000"); char addr[256]; char buff[256]; sleep(1); for (int i=0; i < 10; i++) { zmq_send(sock, "server", 7, ZMQ_SNDMORE); zmq_send(sock, "message", 10, 0); printf("send message\n");

zmq_recv(sock, addr, 256, 0); zmq_recv(sock, buff, 256, 0); printf("recv %s\n", buff); } zmq_close(sock); zmq_term(ctx); }

···

···C

include

include

include

int main() { int major, minor, patch; zmq_version(&major, &minor, &patch); printf("ZeroMQ %d.%d.%d\n", major, minor, patch); void ctx = zmq_ctx_new(); void sock = zmq_socket(ctx, ZMQ_ROUTER); zmq_setsockopt(sock, ZMQ_IDENTITY, "server", 7);

int mandatory = 1; zmq_setsockopt(sock, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof(mandatory)); zmq_bind(sock, "tcp://127.0.0.1:10000"); char addr[256]; char buff[256]; while (true) { int len = zmq_recv(sock, addr, 256, 0); zmq_recv(sock, buff, 256, 0); printf("recv %s\n", buff); sleep(4);

zmq_send(sock, addr, len, ZMQ_SNDMORE); zmq_send(sock, "response", 9, 0); printf("send response\n"); } zmq_close(sock); zmq_term(ctx); } ···

The procedure is to kill the client when waiting. After reading the code I think the latest version would be supposed to be stuck there at sending, unless a timeout is set.

— Reply to this email directly or view it on GitHub https://github.com/sailinglab/mldb/issues/1#issuecomment-132886957.

David Dai

sailinglabx commented 9 years ago

Also, the latest stable zmq is 4.1.3. Let's use the stable release instead of experimental versions.

On Thu, Aug 20, 2015 at 8:33 AM David Dai notifications@github.com wrote:

Nice discussions! One request: Weiren could you put the code in repo under experiment/zmq ? That way I can see the code more easily. Thanks!Thanks for the

On Thu, Aug 20, 2015 at 6:31 AM YuWeiren notifications@github.com wrote:

Yeah you are right. The ROUTER default behavior would suffice if nothing is to be done about the discarded msg. The ZMQ_ROUTER_MANDATORY is for debug purpose though. I have used latest 4.2.0 and the problem still exists. I will provide a workable pair.

···C

include

include

include

int main() { int major, minor, patch; zmq_version(&major, &minor, &patch); printf("ZeroMQ %d.%d.%d\n", major, minor, patch); void ctx = zmq_ctx_new(); void sock = zmq_socket(ctx, ZMQ_ROUTER); int mandatory = 1; zmq_setsockopt(sock, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof(int)); zmq_connect(sock, "tcp://127.0.0.1:10000"); char addr[256]; char buff[256]; sleep(1); for (int i=0; i < 10; i++) { zmq_send(sock, "server", 7, ZMQ_SNDMORE); zmq_send(sock, "message", 10, 0); printf("send message\n");

zmq_recv(sock, addr, 256, 0); zmq_recv(sock, buff, 256, 0); printf("recv %s\n", buff); } zmq_close(sock); zmq_term(ctx); }

···

···C

include

include

include

int main() { int major, minor, patch; zmq_version(&major, &minor, &patch); printf("ZeroMQ %d.%d.%d\n", major, minor, patch); void ctx = zmq_ctx_new(); void sock = zmq_socket(ctx, ZMQ_ROUTER); zmq_setsockopt(sock, ZMQ_IDENTITY, "server", 7);

int mandatory = 1; zmq_setsockopt(sock, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof(mandatory)); zmq_bind(sock, "tcp://127.0.0.1:10000"); char addr[256]; char buff[256]; while (true) { int len = zmq_recv(sock, addr, 256, 0); zmq_recv(sock, buff, 256, 0); printf("recv %s\n", buff); sleep(4);

zmq_send(sock, addr, len, ZMQ_SNDMORE); zmq_send(sock, "response", 9, 0); printf("send response\n"); } zmq_close(sock); zmq_term(ctx); } ···

The procedure is to kill the client when waiting. After reading the code I think the latest version would be supposed to be stuck there at sending, unless a timeout is set.

— Reply to this email directly or view it on GitHub https://github.com/sailinglab/mldb/issues/1#issuecomment-132886957.

David Dai

— Reply to this email directly or view it on GitHub https://github.com/sailinglab/mldb/issues/1#issuecomment-132906362.

David Dai

holyglenn commented 9 years ago

Yeah I'll put them there. I have tested 4.1.2 the result is the same.