zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.74k stars 2.36k forks source link

【zmq 4.3.4】pub-sub(tcp protocol) mode crash by _io_error #4364

Open ABlueLight opened 2 years ago

ABlueLight commented 2 years ago

Please use this template for reporting suspected bugs or requests for help.

Issue description

when use ZMQ_PUB ZMQ_SUB and set heart_beat_timout maybe crash by _io_error error message Assertion failed: !_io_error (/home/users/jiangkuan.liu/tools/zeromq-4.3.4/src/stream_engine_base.cpp:331)

Environment

Minimal test code / Steps to reproduce the issue

1. Minimal test code

PUB code

void* sockets;
std::mutex mtx;
std::atomic_int item_nums;
void *context = nullptr;

int main() {
  std::vector <std::string> zmq_protocol_items;
  std::string ip_port = "tcp://127.0.0.1:4124";
  zmq_protocol_items.push_back(ip_port);

  context = zmq_ctx_new();
  if (context == nullptr) {
    std::cout << "init zmq context failed!\n";
    return -1;
  }
  for (auto &proto : zmq_protocol_items) {
    std::cout << proto << "\n";
    void *socket = zmq_socket(context, ZMQ_PUB);

    int zmq_sndhwm = 100;
    zmq_setsockopt(socket,
                       ZMQ_SNDHWM,
                       &zmq_sndhwm,
                       sizeof(int));

    int zmq_heart_beat_ivl = 5;
    zmq_setsockopt(socket,
                     ZMQ_HEARTBEAT_IVL,
                     &zmq_heart_beat_ivl,
                     sizeof(int));

    int zmq_heart_beat_timeout = 5;
    zmq_setsockopt(socket,
                     ZMQ_HEARTBEAT_TIMEOUT,
                     &zmq_heart_beat_timeout,
                     sizeof(int));
    int ret = zmq_bind(socket, proto.c_str());
    if (ret < 0) {
      std::cout << "zmq linke failed!\n";
      continue;
    }
    sockets = socket;
  }
  std::vector<char *> data_vec;
  std::vector<int> data_size_vec;
  data_size_vec.push_back(10);
  data_size_vec.push_back(10);
  data_size_vec.push_back(100);
  char *data_0 = new char[10];
  data_0[0] = 1;
  char *data_1 = new char[10];
   data_1[0] = 2;
  char *data_2 = new char[100];
   data_2[0] = 3;
  data_vec.push_back(data_0);
  data_vec.push_back(data_1);
  data_vec.push_back(data_2);
  int send_indx = 0;
  while (1) {    
    int nums = 0;
    int flag = ZMQ_NOBLOCK | ZMQ_SNDMORE;
    int send_ret = 0;
    for (int i = 0; i < data_vec.size(); i++) {
      zmq_msg_t data_msg;
      if (zmq_msg_init_data(&data_msg, data_vec[i], data_size_vec[i],
                            NULL, NULL) !=0) {
        std::cout << "zmq_msg_init_data failed!\n";
        continue;                        
      }
      if (i == 2) {
        flag = ZMQ_NULL | ZMQ_NOBLOCK;
      }
      send_ret = zmq_msg_send(&data_msg, sockets, flag);
      zmq_msg_close(&data_msg);
    }
    send_indx++;
  }
  zmq_close(sockets);
  zmq_ctx_shutdown(context);
  zmq_ctx_term(context);
}

SUB code

std::vector <void*> sockets;
std::mutex mtx;
std::atomic_int item_nums;
void *context = nullptr;

int main() {
  std::vector <std::string> zmq_protocol_items;
  std::string ip_port = "tcp://127.0.0.1:4124";
  zmq_protocol_items.push_back(ip_port);
  context = zmq_ctx_new();
  if (context == nullptr) {
    std::cout << "init zmq context failed!\n";
    return -1;
  }
  for (auto &proto : zmq_protocol_items) {
    std::cout << proto << "\n";
    void *socket = zmq_socket(context, ZMQ_SUB);
    zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
    int hwm = 100;
    zmq_setsockopt(socket,
                     ZMQ_RCVHWM,
                     &hwm,
                     sizeof(int));
    int zmq_heart_beat_ivl = 5;
    zmq_setsockopt(socket,
                     ZMQ_HEARTBEAT_IVL,
                     &zmq_heart_beat_ivl,
                     sizeof(int));

    int zmq_heart_beat_timeout = 5;
    zmq_setsockopt(socket,
                     ZMQ_HEARTBEAT_TIMEOUT,
                     &zmq_heart_beat_timeout,
                     sizeof(int));
    int ret = zmq_connect(socket, proto.c_str());
    if (ret < 0) {
      std::cout << "zmq linke failed!\n";
      continue;
    }
    std::unique_lock<std::mutex> lk(mtx);
    sockets.push_back(socket);
    lk.unlock();
  }
  std::this_thread::sleep_for(std::chrono::milliseconds(1000));

  while (1) {
    zmq_msg_t msg;
    if (zmq_msg_init(&msg) < 0) {
      std::cout<< "rpc server recv msg init failed\n";
    }   
    if (zmq_msg_recv(&msg, sockets[0], 0) < 0) {
      zmq_msg_close(&msg);
    } else {
      zmq_msg_close(&msg);
    }
    // std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  }
  zmq_ctx_shutdown(context);
  zmq_ctx_term(context);
  return 0;
}

2. steps

first run sub program, and then run pub program if first run two sub programs, and then run pub program will be more easy to produce this error

if i set ZMQ_HEARTBEAT_TIMEOUT 0 or not set ZMQ_HEARTBEAT_TIMEOUT , it will not crash. if i set ZMQ_HEARTBEAT_TIMEOUT more large number, it will very difficult to create this bug i know set ZMQ_HEARTBEAT_TIMEOUT euqal to ZMQ_HEARTBEAT_IVL, and use a very small value is not normal, but it is just easy to create this bug. if i set reasonable ZMQ_HEARTBEAT_TIMEOUT and ZMQ_HEARTBEAT_IVL, it just very difficult to happen this bug, but it is also has possibility to create this bug.

What's the actual result? (include assertion message & call stack if applicable)

crash by assert failed _io_error


Assertion failed: !_io_error (/home/users/jiangkuan.liu/tools/zeromq-4.3.4/src/stream_engine_base.cpp:331)

Program received signal SIGABRT, Aborted.
[Switching to Thread 0x7ffff62b7700 (LWP 26901)]
0x00007ffff6cf3f37 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
54      ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) bt
#0  0x00007ffff6cf3f37 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
#1  0x00007ffff6cf533a in __GI_abort () at abort.c:89
#2  0x00007ffff7b65e5e in zmq::zmq_abort(char const*) () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#3  0x00007ffff7b92992 in zmq::stream_engine_base_t::out_event() () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#4  0x00007ffff7bb04fa in zmq::zmtp_engine_t::process_heartbeat_message(zmq::msg_t*) () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#5  0x00007ffff7b9247b in zmq::stream_engine_base_t::decode_and_push(zmq::msg_t*) () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#6  0x00007ffff7b93036 in zmq::stream_engine_base_t::restart_input() () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#7  0x00007ffff7b66b6c in zmq::io_thread_t::in_event() () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#8  0x00007ffff7b6560e in zmq::epoll_t::loop() () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#9  0x00007ffff7b99ac9 in thread_routine () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#10 0x00007ffff70676e4 in start_thread (arg=0x7ffff62b7700) at pthread_create.c:333
#11 0x00007ffff6da729d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109

crash by assert failed _input_stoped


Assertion failed: _input_stopped (/home/users/jiangkuan.liu/tools/zeromq-4.3.4/src/stream_engine_base.cpp:417)

Program received signal SIGABRT, Aborted.
[Switching to Thread 0x7ffff62b7700 (LWP 3216)]
0x00007ffff6cf3f37 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
54      ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) bt
#0  0x00007ffff6cf3f37 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
#1  0x00007ffff6cf533a in __GI_abort () at abort.c:89
#2  0x00007ffff7b65e5e in zmq::zmq_abort(char const*) () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#3  0x00007ffff7b93212 in zmq::stream_engine_base_t::restart_input() () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#4  0x00007ffff7b66b6c in zmq::io_thread_t::in_event() () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#5  0x00007ffff7b6560e in zmq::epoll_t::loop() () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#6  0x00007ffff7b99ac9 in thread_routine () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#7  0x00007ffff70676e4 in start_thread (arg=0x7ffff62b7700) at pthread_create.c:333
#8  0x00007ffff6da729d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109
(gdb)

What's the expected result?

in this situation, it will not crash

ABlueLight commented 2 years ago

Any advice on how to avoid this bug

ABlueLight commented 2 years ago

anyone can offer help? Thanks

ABlueLight commented 2 years ago

https://github.com/zeromq/libzmq/issues/4229 https://github.com/zeromq/libzmq/issues/3937

seem is the same reason

jmwample commented 2 years ago

Not sure if this is still relevant, but leaving a comment just in case. I ran into this through the golang wrapper for this library (github.com/pebbe/zmq4). I don't think there is a direct fix, but the cause of the issue in my case has been thread starvation which is likely the case in general. The Heartbeat interval and timeout values can help delay, but if your zmq thread starves it will trigger this assertion.

My suggestion for a "work-around" would be to review where things are blocking (i.e on mutexes), where threads are being launched, and when threads are being scheduled to try to ensure that any zmq socket threads never block for more than heartbeat duration.

ABlueLight commented 2 years ago

@jmwample thanks for your suggestion