zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.75k stars 2.36k forks source link

Client Crashes with XPUB-XSUB forwarder device #4197

Open Prinkesh opened 3 years ago

Prinkesh commented 3 years ago

Issue description

Pub sub client crashes after xpub-xsub forwarder device is shutdown able to recreate when the client and server are on different machines

Steps to recreate

If we use pub-sub forwarder this issue doesn't happen. Why would this be case ? If we remove HEARTBEAT_TIMEOUT setting from publisher this issue doesn't happen.

Environment

Minimal test code / Steps to reproduce the issue

1. Client Code

ZmqHandler::ZmqHandler(std::ostream &log):myfile(log){
  string inConnString     = string("tcp://192.168.151.8:19925");
  string outConnString    = string("tcp://192.168.151.8:19926");

  publisher = new zmq::socket_t(context, ZMQ_PUB);
  subscriber = new zmq::socket_t(context, ZMQ_SUB);

  int hbtTimeout = 30 * 1000; // 30 seconds
  int hbtInterval = 1 * 1000; // 3 seconds

  int HWM = 0;
  subscriber->setsockopt(ZMQ_SNDHWM, &HWM, sizeof(HWM));
  subscriber->setsockopt(ZMQ_HEARTBEAT_TIMEOUT,&hbtTimeout,sizeof(hbtTimeout));
  subscriber->setsockopt(ZMQ_HEARTBEAT_IVL,&hbtInterval,sizeof(hbtInterval));

  publisher->setsockopt(ZMQ_HEARTBEAT_TIMEOUT,&hbtTimeout,sizeof(hbtTimeout));
  publisher->setsockopt(ZMQ_HEARTBEAT_IVL,&hbtInterval,sizeof(hbtInterval));

  int linger = 0;
  publisher->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));

//  int rcvTimeOutInMilli = 100; // milliseconds
  std::string zkey= "D";
  std::string currentZmqSocket{""},zmqErrMsg="";
  try{
     zmqErrMsg = "check ZmqPublishConn key in conf";
     currentZmqSocket = outConnString;
     publisher->connect(outConnString.c_str());

     zmqErrMsg = "check ZmqRecvConn key in conf";
     currentZmqSocket = inConnString;
     subscriber->connect(inConnString.c_str());

     for (int i = 0 ; i < 3000; i++) {
         std::string zkey = std::to_string(i);
          subscriber->setsockopt(ZMQ_SUBSCRIBE, zkey.c_str(),zkey.size());
     }

  }
  catch (zmq::error_t error) {
     std::cout <<  " ZmqHandler,E_ZMQ_SOCKET," << error.what() << "," << currentZmqSocket << "." << zmqErrMsg << std::endl;
     throw 1;
  }
  std::cout <<  ",Publish," << outConnString << std::endl;
  std::cout <<  ",Recv," << inConnString << std::endl;
  while(1){
    zmq::message_t msg;
    bool req = subscriber->recv(&msg,ZMQ_DONTWAIT);
    if(!req ){
          usleep(50000);
          // Do other tasks 
    }  else {
      std::string msgStr(static_cast<char*>(msg.data()), msg.size());
      std::cout  <<  "RECV," << msgStr << std::endl;
    }
  }  

}
  1. forwarder code
    
    import zmq

def main():

try:
    context = zmq.Context(1)
    # Socket facing clients
    frontend = context.socket(zmq.XPUB)

    frontend.bind("tcp://192.168.151.8:19925")
    # Socket facing services
    backend = context.socket(zmq.XSUB)

    backend.bind("tcp://192.168.151.8:19926")

    zmq.device(zmq.FORWARDER, frontend, backend)
except Exception, e:
    print e
    print "bringing down zmq device"
finally:
    #pass
    frontend.close()
    backend.close()
    context.term()

if name == "main": main()


# What's the actual result? (include assertion message & call stack if applicable)
```gdb
#0  0x00007f4e013a5207 in raise () from /lib64/libc.so.6
#1  0x00007f4e013a68f8 in abort () from /lib64/libc.so.6
#2  0x0000000000415dc9 in zmq::zmq_abort (errmsg_=errmsg_@entry=0x465431 "!_io_error") at src/err.cpp:87
#3  0x0000000000447263 in zmq::stream_engine_t::out_event (this=0x7f4dfc008470) at src/stream_engine.cpp:367
#4  0x0000000000447e5c in zmq::stream_engine_t::timer_event (this=0x7f4dfc008470, id_=<optimized out>) at src/stream_engine.cpp:1025
#5  0x000000000041f05e in zmq::poller_base_t::execute_timers (this=this@entry=0x20a06d0) at src/poller_base.cpp:102
#6  0x0000000000415458 in zmq::epoll_t::loop (this=0x20a06d0) at src/epoll.cpp:173
#7  0x000000000042d746 in thread_routine (arg_=0x20a0728) at src/thread.cpp:182
#8  0x00007f4e01f62dd5 in start_thread () from /lib64/libpthread.so.0
#9  0x00007f4e0146cead in clone () from /lib64/libc.so.6

What's the expected result?

ljluestc commented 1 year ago
#include <zmq.hpp>
#include <iostream>
#include <unistd.h>

class ZmqHandler {
public:
    ZmqHandler(std::ostream& log) : myfile(log) {
        // ... Your previous code ...

        // Set the XPUB socket to not drop any subscriptions when a subscriber disconnects
        int keepalive = 1;
        frontend->setsockopt(ZMQ_XPUB_VERBOSE, &keepalive, sizeof(keepalive));
    }

    ~ZmqHandler() {
        delete publisher;
        delete subscriber;
    }

private:
    zmq::context_t context;
    zmq::socket_t* publisher;
    zmq::socket_t* subscriber;
    std::ostream& myfile;
};

int main() {
    try {
        zmq::context_t context(1);

        // Socket facing clients
        zmq::socket_t frontend(context, zmq::XPUB);
        frontend.bind("tcp://192.168.151.8:19925");

        // Socket facing services
        zmq::socket_t backend(context, zmq::XSUB);
        backend.bind("tcp://192.168.151.8:19926");

        // Set the XPUB socket to not drop any subscriptions when a subscriber disconnects
        int keepalive = 1;
        frontend.setsockopt(ZMQ_XPUB_VERBOSE, &keepalive, sizeof(keepalive));

        zmq::device(zmq::FORWARDER, frontend, backend);
    }
    catch (const std::exception& e) {
        std::cout << "Error: " << e.what() << std::endl;
        std::cout << "Bringing down zmq device" << std::endl;
    }
    return 0;
}