erpc-io / eRPC

Efficient RPCs for datacenter networks
https://erpc.io/
Other
835 stars 137 forks source link

Segmentation fault happens frequently when server send requests to client #113

Open WWaynee opened 1 month ago

WWaynee commented 1 month ago

I am currently writing a test, client sends requests to server and after some asynchronous processing the server sends the result back to the client, which is not directly enqueuing a response in server thread. So I need to make server connect to client's rpc, but something strange happens when server sends requests to client that causes segmentation fault. The error is occurred in the enqueue_request function, on this line: https://github.com/erpc-io/eRPC/blob/82b31fc4df53dd7a2aff1fc6ad1a3888c5c7b6ac/src/rpc_impl/rpc_req.cc#L27 I print the variable that the size() uses. The access of size_t type variable free_index_ caused segmentation fault. https://github.com/erpc-io/eRPC/blob/82b31fc4df53dd7a2aff1fc6ad1a3888c5c7b6ac/src/util/fixed_vector.h#L36C3-L36C10 I don't know the real cause of the problem. Does the startup sequence of client and server matter? I start the server first and then client, and I use a event_loop in a while to check the count of connected session to until they are fully and successfully connected.

anujkaliaiitd commented 1 month ago

Interesting. There is a test that stresses sending requests from within request handlers: https://github.com/erpc-io/eRPC/blob/master/tests/client_tests/req_in_req_func_test.cc. If you are able to create a small example program that reproduces the issue, I can take a look.

WWaynee commented 1 month ago

Here is a small exmaple(C++20) that has the posibility of segfault, sometimes it can run successfully. And the cause is still the variable free_index_. It's quite different from the example of sending requests from request handlers you provided. In my test, the server send requests to the client's rpcs that don't create sessions with server's rpcs. Sometimes the segfault happens in the client, sometimes in the server.

Here is my test code. test.h

#include <string>
#include <vector>
#include "rpc.h"

enum RPCType : int {
    kTest1 = 1,
    kTest2 = 2,
};

std::string client_uri = "localhost:31850";
std::string server_uri = "localhost:31851";

erpc::FastRand fastrand_;

// For choosing a random session number
inline auto FastRandSessionNum(const std::vector<int> &sn_vec) -> int {
    uint32_t x = fastrand_.next_u32();
    size_t rand_index = (static_cast<size_t>(x) * sn_vec.size()) >> 32;
    return sn_vec[rand_index];
}

struct ContContextT {
  int finished_;
  char *buf_;
};

void ContFunc(void *context, void *tag) {
  auto *ret = reinterpret_cast<ContContextT *>(tag);
  ret->finished_ = 1;
}

int rpc_count = 4;

test_client.cc

#include "msg_buffer.h"
#include "transport_impl/infiniband/ib_transport.h"
#include "test.h"
#include "nexus.h"
#include <cstdint>
#include <thread>
#include <iostream>
#include <memory>
#include <rpc.h>
#include <sys/sdt.h>
#include <sys/types.h>
#include <atomic>

std::shared_ptr<erpc::Nexus> nexus;

std::vector<std::thread> jobs;
std::atomic<int> server_connected_count = 0;
std::vector<int> session_nums;

std::atomic<bool> get_result = false;
std::vector<std::thread> client_threads;

struct ClientContext {
  erpc::Rpc<erpc::CTransport> *rpc_ = nullptr;
  size_t rpc_id_;
  uint64_t nr_reqs_ = 0;
};

void SessionHandler(int /*unused*/, erpc::SmEventType /*unused*/,
                    erpc::SmErrType /*unused*/, void * /*unused*/) {
  server_connected_count.fetch_add(1);
}

void SessionHandler2(int /*unused*/, erpc::SmEventType /*unused*/,
                    erpc::SmErrType /*unused*/, void * /*unused*/) {
}

void Test2handler(erpc::ReqHandle *req_handle, void *_ctx) {
  auto *c = static_cast<ClientContext*>(_ctx);
  auto data = req_handle->get_req_msgbuf()->buf_;
  auto &resp = req_handle->pre_resp_msgbuf_;
  c->rpc_->enqueue_response(req_handle, &resp);
  std::cout<<"Receive result from server"<<std::endl;
  get_result.store(true);
  get_result.notify_all();
}

void EventLoopThread(erpc::Nexus *nexus, size_t rpc_id) {
  ClientContext client_context = {.rpc_id_ = rpc_id};
  client_context.rpc_ =
      new erpc::Rpc<erpc::CTransport>(nexus, &client_context, rpc_id, SessionHandler2);
  while (!get_result.load()) {
    client_context.rpc_->run_event_loop(1000);
  }
  delete client_context.rpc_;
}

int main() {
  nexus = std::make_shared<erpc::Nexus>(client_uri);
  nexus->register_req_func(RPCType::kTest2, Test2handler);

  for (int i = 0; i < rpc_count; i++) {
    client_threads.emplace_back(EventLoopThread, nexus.get(), i);
  }

  std::unique_ptr<erpc::Rpc<erpc::CTransport>> rpc_;

  // Connect Server
  rpc_ = std::make_unique<erpc::Rpc<erpc::CTransport>>(nexus.get(), nullptr, rpc_count, SessionHandler);
  for (int i = 0; i < rpc_count; i++) {
    int session_num = rpc_->create_session(server_uri, i);
    if (session_num >= 0) {
      session_nums.push_back(session_num);
    }
  }
  while (server_connected_count.load() != rpc_count) {
    rpc_->run_event_loop_once();
  }

  // Send request to server
  auto req_buf = rpc_->alloc_msg_buffer_or_die(16);
  auto resp_buf = rpc_->alloc_msg_buffer_or_die(16);
  ContContextT cc = {0, reinterpret_cast<char *>(resp_buf.buf_)};
  rpc_->enqueue_request(FastRandSessionNum(session_nums), RPCType::kTest1, &req_buf, &resp_buf, ContFunc, &cc);
  while (cc.finished_ == 0) {
    rpc_->run_event_loop_once();
  }
  rpc_->free_msg_buffer(req_buf);
  rpc_->free_msg_buffer(resp_buf);

  // Block until receive result from server
  get_result.wait(false);

  for (int i = 0; i < rpc_count; i++) {
    client_threads[i].join();
  }
  return 0;
}

test_server.cc

#include "msg_buffer.h"
#include "transport_impl/infiniband/ib_transport.h"
#include "test.h"
#include <cstdint>
#include <thread>
#include <memory>
#include "rpc.h"
#include <sys/sdt.h>
#include <sys/types.h>
#include <util/numautils.h>
#include <iostream>

std::shared_ptr<erpc::Nexus> nexus;

std::vector<std::thread> jobs;
std::atomic<int> gConnectedSessionNum(0);
std::atomic<bool> serverStop = false;

thread_local std::vector<int> session_nums;
thread_local std::atomic<int> client_connected_count = 0;

struct ServerContext {
  erpc::Rpc<erpc::CTransport> *rpc_ = nullptr;
  size_t rpc_id_;
  uint64_t nr_reqs_ = 0;
};

void ServerHandler(int /*unused*/, erpc::SmEventType /*unused*/,
                    erpc::SmErrType /*unused*/, void * /*unused*/) {
  client_connected_count.fetch_add(1);
}

void EventLoopThread(size_t rpc_id) {
  ServerContext server_context = {.rpc_id_ = rpc_id};
  server_context.rpc_ =
      new erpc::Rpc<erpc::CTransport>(nexus.get(), &server_context, rpc_id, ServerHandler);
  // Connect to client
  for (int i = 0; i < rpc_count; i++) {
    int session_num = server_context.rpc_->create_session(client_uri, i);
    if (session_num >= 0) {
      session_nums.push_back(i);
    }
  }
  while (client_connected_count.load() != rpc_count) {
    server_context.rpc_->run_event_loop_once();
  }

  while (true) {
    server_context.rpc_->run_event_loop(1000);
    if (serverStop.load()) {
      break;
    }
  }
  delete server_context.rpc_;
}

void Test1handler(erpc::ReqHandle *req_handle, void *_ctx) {
  auto *c = static_cast<ServerContext*>(_ctx);
  auto data = req_handle->get_req_msgbuf()->buf_;
  auto &resp = req_handle->pre_resp_msgbuf_;

  // Send request to Client
  auto req_buf = c->rpc_->alloc_msg_buffer_or_die(16);
  auto resp_buf = c->rpc_->alloc_msg_buffer_or_die(16);
  ContContextT cc = {0, reinterpret_cast<char *>(resp_buf.buf_)};
  c->rpc_->enqueue_request(FastRandSessionNum(session_nums), RPCType::kTest2, &req_buf, &resp_buf, ContFunc, &cc);
  while (cc.finished_ == 0) {
    c->rpc_->run_event_loop_once();
  }
  // Enqueue a response to the original request from client
  c->rpc_->enqueue_response(req_handle, &resp);
  serverStop.store(true);
}

int main() {
  nexus = std::make_shared<erpc::Nexus>(server_uri);
  nexus->register_req_func(RPCType::kTest1, Test1handler);
  for (int i = 0; i < rpc_count; i++) {
    jobs.emplace_back(EventLoopThread, i);
  }
  for (int i = 0; i < rpc_count; i++) {
    jobs[i].join();
  }
  return 0;
}
WWaynee commented 1 month ago

It seems like something wrong happens when enqueuing requests on the server side, but the session is already connected. That's strange.