apache / brpc

brpc is an Industrial-grade RPC framework using C++ Language, which is often used in high performance system such as Search, Storage, Machine learning, Advertisement, Recommendation etc. "brpc" means "better RPC".
https://brpc.apache.org
Apache License 2.0
16.56k stars 3.98k forks source link

使用 stream RPC,实现 server 端向 client 端发送消息时,接收方解析 response 失败,导致 RPC 超时 #2772

Closed Slontia closed 1 month ago

Slontia commented 1 month ago

Describe the bug (描述bug)

使用 stream RPC,从 server 端向 client 端发送消息时,在 client 端遇到了 check failed,原因是解析 message 失败,没有从 header 中解析出预期的协议名称(解析逻辑在 ParseRpcMessage 里)。随后一段时间后,RPC 报错超时。

因为 example 中只包括了 client 给 server 发送 stream message 的例子,我不是很确定我这里的写法是否正确。

这是日志里发现的 check failed:

14473 F0926 11:37:47.618417 7fb109dfe700 /my_workspace/incubator-brpc/incubator-brpc-1.0.0/src/brpc/stream.cpp:617] Check failed: false.
14474 #0 0x0000062f041e brpc::Stream::HandleRpcResponse()
14475 #1 0x0000062f0701 brpc::Stream::Consume()
14476 #2 0x000006144db0 bthread::ExecutionQueueBase::_execute()
14477 #3 0x000006147458 bthread::ExecutionQueueBase::_execute_tasks()
14478 #4 0x000006160fd7 bthread::TaskGroup::task_runner()
14479 #5 0x0000061443d1 bthread_make_fcontext

我尝试在此处加了一些日志,观察到 header 里并未包含「PRPC」。

 ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket,
                             bool /*read_eof*/, const void*) {
     char header_buf[12];
     const size_t n = source->copy_to(header_buf, sizeof(header_buf));
     if (n >= 4) {
         void* dummy = header_buf;
         if (*(const uint32_t*)dummy != *(const uint32_t*)"PRPC") {
             LOG(ERROR) << "header is not PRPC, it is " << header_buf; // 踩到了这里
             return MakeParseError(PARSE_ERROR_TRY_OTHERS);
         }
     } else {
         if (memcmp(header_buf, "PRPC", n) != 0) {
             LOG(ERROR) << "header is not PRPC, it is " << header_buf;
             return MakeParseError(PARSE_ERROR_TRY_OTHERS);
         }
     }
   ...
}

日志报错:

14453 E0926 11:37:43.566200 7fb10c9fe700 /my_workspace/incubator-brpc/incubator-brpc-1.0.0/src/brpc/policy/baidu_rpc_protocol.cpp:99] header is not PRPC, it is
14454 ^R^H<8b><80><80>¨ï<9a>½3^P

To Reproduce (复现方法)

这是我 server 端的代码,做了一定程度的简化。目前的实现的顺序是 accept stream -> 回复 response -> write stream。

void MyServer::MyMethod(::google::protobuf::RpcController *cntl_base, const MyRequest *request,
                        MyResponse *response, ::google::protobuf::Closure *done) {
  brpc::ClosureGuard done_guard(done);

  brpc::StreamId stream_id;
  int brpc_ret = 0;
  if (TD_UNLIKELY(0 !=
                  (brpc_ret = brpc::StreamAccept(&stream_id, *static_cast<brpc::Controller *>(cntl_base), nullptr)))) {
    response->set_success(false);
    return;
  }

  done_guard.reset(nullptr); // 回包

  butil::IOBuf serialized_message_iobuf = GenerateData(); // 准备数据
  if (TD_UNLIKELY(0 != (brpc_ret = brpc::StreamWrite(stream_id, serialized_message_iobuf)))) {
    brpc::StreamClose(stream_id);
  }
}

这是我 client 端的简化代码,将 closure 和 stream handler 集成到了一个类里。

class ClientClosure : public google::protobuf::Closure, public brpc::StreamInputHandler {
  enum DestructBit { k_stream_closed = 0x1, k_rpc_responded = 0x2 };

 public:
  void Run() {
    const auto destruct_guard = create_scope_guard([this] { Destruct_(DestructBit::k_rpc_responded); });
    if (TD_UNLIKELY(cntl_.Failed())) {
      brpc::StreamClose(stream_id_);
      return;
    }
    close_stream_guard.rollback();
  }

  int on_received_messages(const brpc::StreamId id, butil::IOBuf *const messages[], size_t size) final {
    assert(id == stream_id_);
    // handle messages
    ...
    brpc::StreamClose(stream_id_);
    return 0;
  }

  void on_idle_timeout(const brpc::StreamId id) final {
    assert(id == stream_id_);
    brpc::StreamClose(stream_id_);
  }

  void on_closed(const brpc::StreamId id) final {
    assert(id == stream_id_);
    Destruct_(DestructBit::k_stream_closed);
  }

  void Send(const MyRequest &request) {
    brpc::StreamOptions stream_options;
    stream_options.handler = this;
    int brpc_ret = 0;
    if (0 != (brpc_ret = brpc::StreamCreate(&stream_id_, cntl_, &stream_options))) {
      delete this;
    } else {
      MyService_Stub(channel_.get()).MyMethod(&cntl_, &request, &response_, this);
    }
  }

 private:
  // The closure will not be destroyed until both the RPC is finished and the stream is closed.
  void Destruct_(const DestructBit bit) {
    if ((k_stream_closed | k_rpc_responded) == (destruct_bitset_ |= bit)) {
      delete this;
    }
  }

  bthread::CountdownEvent &latch_;
  Channel channel_;
  MyResponse response_;
  brpc::Controller cntl_;
  brpc::StreamId stream_id_{brpc::INVALID_STREAM_ID};
  std::atomic<uint8_t> destruct_bitset_{0};
};

Expected behavior (期望行为)

期望是 RPC 不超时,client 端能从 server 端接收到 response 和 stream message。

Versions (各种版本) OS: CentOS Linux release 7.2 (Final) Compiler: g++ (GCC) 10.2.1 20210130 (Red Hat 10.2.1-11) brpc: protobuf:

Additional context/screenshots (更多上下文/截图)

Slontia commented 1 month ago

问题定位到了,和 brpc 的实现没有关系,issue 先 close 了

wwbmmm commented 1 month ago

MyResponse *response 这个response需要先写入一些字段再发送回复吗?是不是因为没设置必要字段,序列化失败所以没有给client端发送回复

Slontia commented 1 month ago

MyResponse *response 这个response需要先写入一些字段再发送回复吗?是不是因为没设置必要字段,序列化失败所以没有给client端发送回复

不需要的,没有设置必要字段也是可以响应的。上面给出的实现在我的环境里最终运行是没有问题的。

之前有问题的实现,是 server 端先 StreamWrite 再回的包,这种情况下会导致向 stream 写的数据先于 response 序列化。而 client 解析的时候,预期是先解析的 response,这会导致解析 response 失败。