apache / brpc

brpc is an Industrial-grade RPC framework using C++ Language, which is often used in high performance system such as Search, Storage, Machine learning, Advertisement, Recommendation etc. "brpc" means "better RPC".
https://brpc.apache.org
Apache License 2.0
16.51k stars 3.97k forks source link

[Feature] Server-end progressive reader for http protocol #2145

Open TangSiyang2001 opened 1 year ago

TangSiyang2001 commented 1 year ago

Is your feature request related to a problem? (你需要的功能是否与某个问题有关?)

Apache Doris currently supports stream load feature with http, which transmits large size (1-10G) of data progressively from user's client to doris' BE http server (currently implemented with libevent and is planned to be replaced by brpc).

According to #1233 and its related issue, brpc needs the feature of this kind.

Describe the solution you'd like (描述你期望的解决方法)

Demands:

  1. We need a server end progressive reader just like what the current http client does, see.
  2. For the reason that stream load requires some pre-check once all the http headers are recieved, and stops the transmition if the pre-check is failed, we need a user-defined callback interface to control the reading process of the progressive reader.

Design:

  1. Share the ProgressiveReader interface.
  2. User code like:

    void example(google::protobuf::RpcController* cntl_base,
               const HttpRequest*,
               HttpResponse*,
               google::protobuf::Closure* done) {
        brpc::Controller* cntl =
            static_cast<brpc::Controller*>(cntl_base);
    
        // for demand2, process headers in the normal way, and end for authorization failed.
        const std::string* header = cntl->http_request().GetHeader("header-name");
        on_header(header);
    
        cntl->ReadProgressiveAttachmentBy(new MyProgressiveReader);
    }

KeyPoints:

  1. Design a good way to config the relation between URI(Service Method) and progressive-read option, so that user can request a specific URI for progressive read.
  2. Socket::read_will_be_progressive should be called before ParseHttpMessage.

Solutions:

  1. Configuration Server-end is a bit different with client-end. For a client, we can clearly figure out it's a progressive-reaing client and config the socket to be progressive reading before starting the connection. But for a server, we won't know that the coming request body will be read progressively or not.

    Therefore, we should design an option to enable the progressive reading;

    Under the trade-off, we choose to use a service option rather than a method level one, because a service level one will be more natural and user-friendly, but a method level one may need a new pattern or API to describe.

      int Server::AddServiceInternal(google::protobuf::Service *service,
                                     bool is_builtin_service,
                                     const ServiceOptions &svc_opt) {
              //...
              for (int i = 0; i < sd->method_count(); ++i) {
              const google::protobuf::MethodDescriptor *md = sd->method(i);
              MethodProperty mp;
             //...
              mp.params.pb_single_repeated_to_array = svc_opt.pb_single_repeated_to_array;
              // add an option for progressive_read here
              mp.params.enable_progressive_read;
              mp.service = service;
              mp.method = md;
              mp.status = new MethodStatus;
              _method_map[md->full_name()] = mp;
              //...
          }
        }
          //...
      }
  2. Active progressive read in ParseHttpMessage

      ParseResult ParseHttpMessage(butil::IOBuf *source, Socket *socket,
                             bool read_eof, const void* /*arg*/) {
          //...
    
          ssize_t rc = 0;
          if (read_eof) {
              rc = http_imsg->ParseFromArray(NULL, 0);
          } else {
              rc = http_imsg->ParseFromIOBuf(*source);
          }
          if (http_imsg->is_stage2()) {
              //...
          } else if (rc >= 0) {
              source->pop_front(rc);
              if (http_imsg->Completed()) {
                  //...
              } else if (http_imsg->stage() >= HTTP_ON_HEADERS_COMPLETE) {
                  // active progressive read here
                  http_imsg->CheckProgressiveRead();
                  if (socket->is_read_progressive()) {
                      http_imsg->AddOneRefForStage2(); // released when body is fully read
                      return MakeMessage(http_imsg);
                  }
                  return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
              } else {
                  //...
              }
          } else {
              //...
          }
    
          //...
      }
    
      void HttpContext::CheckProgressiveRead() {
            if (arg() == NULL) {
                // indicates not in server-end
                return;
            }
            // omit some pre-checks, just the main logic
            const Server::MethodProperty* const sp =
                FindMethodPropertyByURI(header().uri().path(), (Server*)arg(), const_cast<std::string*>(&header().unresolved_path()));
            if (sp != NULL && sp->enable_progressive_read) {
                this->setReadProgressively(true);
                socket()->read_will_be_progressive(CONNECTION_TYPE_SHORT);
            }
        }
  3. Modify the ProcessHttpRequest to adapt to progressive reaing We need a logic like below:

      const Server::MethodProperty* const sp =
              FindMethodPropertyByURI(path, server, &req_header._unresolved_path);
      if(imsg_guard->read_body_progressively()){
          // here in this user defined method, user can call `cntl.ReadProgressiveAttachmentBy(new MyProgressiveReader);` 
          // cannot get completed body msg directly though.
          sp->service->CallMethod(method, cntl, req, res, done); 
          if (imsg_guard->body_reader() == NULL) {
              cntl->SetFailed(EHTTP, "progressive reader is unset for a progressive method");
          }
      }

Describe alternatives you've considered (描述你想到的折衷方案)

Additional context/screenshots (更多上下文/截图) issue: #1233 doris issue: https://github.com/apache/doris/issues/16087

chenzhangyi commented 1 year ago
  // blocked until reading is completed
  cntl.ReadProgressiveAttachmentBy(new MyProgressiveReader);

这个只提供同步接口么?


Enable the new server option in Server::StartInternal by

这个应该是一个method级别的option。或者至少得是service级别。 不能是server级别。不然加了这个参数之后,一些service会出现undefined behavior.


为什么需要改acceptor? 这个不应该是socket级别的东西么? 应该只影响ProcessHttpRequest相关的吧,如果需要可以在对应的socket或者messenger里面加一些数据。

TangSiyang2001 commented 1 year ago

这个只提供同步接口么?

我们是否可以采用将done传入progressive reader的方式来实现异步接口,然用户在合适的时候适时调用done->Run()?如果可行那么目前的设计可以cover这种方式。


这个应该是一个method级别的option。或者至少得是service级别。 不能是server级别。不然加了这个参数之后,一些service会出现undefined behavior.

确实如此,本计划实现method级别的option,但暂时没有想到比较理想的方式,由于调用时序的关系,只有在ProcessHttpRequest被调用后,才能确定调用的method。 见ProcessHttpRequest

const Server::MethodProperty* const sp =
        FindMethodPropertyByURI(path, server, &req_header._unresolved_path);
// ...
google::protobuf::Service* svc = sp->service;
const google::protobuf::MethodDescriptor* method = sp->method;

而是否以progressive方式读取取决于socket层的设置,应在相应socket建立之前就确定下来,即需要在Socket::Create中调用

m->read_will_be_progressive(CONNECTION_TYPE_SHORT);

该调用时机显然在ProcessHttpRequest之前,因此认为难以设计method级别的option。service级别的option原因类似。

但是目前这个思路允许指定用户对于某些需要使用自定义progressive reader的method进行指定,而对于未指定的method,使用内置的builtin proxy reader,在所有数据传递完毕后一次性回调用户的method


为什么需要改acceptor? 这个不应该是socket级别的东西么? 应该只影响ProcessHttpRequest相关的吧,如果需要可以在对应的socket或者messenger里面加一些数据。

如果按照以上需要在Socket::Create中调用

m->read_will_be_progressive(CONNECTION_TYPE_SHORT);

的准则,那么需要在Acceptor::OnNewConnectionsUntilEAGAIN调用Socket::Create之前传入相应的SocketOption,而按原先的设计,就需要在Acceptor中记录相应的ServerOption,以在其创建Socket时能设置正确的SocketOption

chenzhangyi commented 1 year ago

@TangSiyang2001 http解析消息的时候,是可以知道是不是header complete的,如果在这个之后判断,是不是能简化问题? 因为method相关的,在header中就拿到了。

TangSiyang2001 commented 1 year ago

@TangSiyang2001 http解析消息的时候,是可以知道是不是header complete的,如果在这个之后判断,是不是能简化问题? 因为method相关的,在header中就拿到了。

确实如此,受教了,我修改下设计。

chenBright commented 2 months ago

我们是否可以采用将done传入progressive reader的方式来实现异步接口,然用户在合适的时候适时调用done->Run()?如果可行那么目前的设计可以cover这种方式。

@TangSiyang2001 异步接口在小包场景下,有use after free的问题。开了asan测试HttpTest.server_end_read_failed有heap-use-after-free报错。以下是asan报告的调用栈:

#0 0x7f63659a2b83 in brpc::HttpMessage::SetBodyReader(brpc::ProgressiveReader*) /home/runner/work/brpc/brpc/src/brpc/details/http_message.cpp:384:13
    #1 0x6aa3e5 in brpc::policy::HttpContext::ReadProgressiveAttachmentBy(brpc::ProgressiveReader*) /home/runner/work/brpc/brpc/test/../src/brpc/policy/http_rpc_protocol.h:118:16
    #2 0x7f636586f505 in brpc::Controller::ReadProgressiveAttachmentBy(brpc::ProgressiveReader*) /home/runner/work/brpc/brpc/src/brpc/controller.cpp:1490:18
    #3 0x66a23c in (anonymous namespace)::UploadServiceImpl::UploadFailed(google::protobuf::RpcController*, test::HttpRequest const*, test::HttpResponse*, google::protobuf::Closure*) /home/runner/work/brpc/brpc/test/brpc_http_rpc_protocol_unittest.cpp:1100:15
    #4 0x622242 in test::UploadService::CallMethod(google::protobuf::MethodDescriptor const*, google::protobuf::RpcController*, google::protobuf::Message const*, google::protobuf::Message*, google::protobuf::Closure*) /home/runner/work/brpc/brpc/test/echo.pb.cc:3291:7
    #5 0x7f6365a7575f in brpc::policy::ProcessHttpRequest(brpc::InputMessageBase*) /home/runner/work/brpc/brpc/src/brpc/policy/http_rpc_protocol.cpp:1610:21
    #6 0x7f63657cbfca in brpc::ProcessInputMessage(void*) /home/runner/work/brpc/brpc/src/brpc/input_messenger.cpp:177:5
    #7 0x7f636557b5ad in bthread::TaskGroup::task_runner(long) /home/runner/work/brpc/brpc/src/bthread/task_group.cpp:345:29
    #8 0x7f6365588040 in bthread_make_fcontext (libbrpc.dbg.so+0x567040)
0x6170000622d8 is located 600 bytes inside of 736-byte region [0x617000062080,0x617000062360)

freed by thread T8 (brpc_wkr:0-5) here:
    #0 0x4eb30d in operator delete(void*) (/home/runner/work/brpc/brpc/test/brpc_http_rpc_protocol_unittest+0x4eb30d)
    #1 0x6aa3b7 in brpc::policy::HttpContext::~HttpContext() /home/runner/work/brpc/brpc/test/../src/brpc/policy/http_rpc_protocol.h:86:7
    #2 0x7f63658615bb in brpc::SharedObject::RemoveRefManually() /home/runner/work/brpc/brpc/./src/brpc/shared_object.h:52:13
    #3 0x7f63658615bb in brpc::intrusive_ptr_release(brpc::SharedObject*) /home/runner/work/brpc/brpc/./src/brpc/shared_object.h:67:10
    #4 0x7f63658615bb in butil::intrusive_ptr<brpc::ReadableProgressiveAttachment>::~intrusive_ptr() /home/runner/work/brpc/brpc/./src/butil/intrusive_ptr.hpp:89:21
    #5 0x7f63658615bb in butil::intrusive_ptr<brpc::ReadableProgressiveAttachment>::reset(brpc::ReadableProgressiveAttachment*) /home/runner/work/brpc/brpc/./src/butil/intrusive_ptr.hpp:124:9
    #6 0x7f63658615bb in brpc::Controller::ResetNonPods() /home/runner/work/brpc/brpc/src/brpc/controller.cpp:231:14
    #7 0x7f636586035c in brpc::Controller::~Controller() /home/runner/work/brpc/brpc/src/brpc/controller.cpp:165:5
    #8 0x7f6365861c3d in brpc::Controller::~Controller() /home/runner/work/brpc/brpc/src/brpc/controller.cpp:160:27
    #9 0x7f63657a3131 in brpc::LogErrorTextAndDelete::operator()(brpc::Controller*) const /home/runner/work/brpc/brpc/src/brpc/protocol.cpp:271:9
    #10 0x7f6365a6df89 in std::unique_ptr<brpc::Controller, brpc::LogErrorTextAndDelete>::~unique_ptr() /usr/bin/../lib/gcc/x86_64-linux-gnu/10/../../../../include/c++/10/bits/unique_ptr.h:361:4
    #11 0x7f6365a6df89 in brpc::policy::HttpResponseSender::~HttpResponseSender() /home/runner/work/brpc/brpc/src/brpc/policy/http_rpc_protocol.cpp:977:1
    #12 0x7f6365a79dcf in brpc::policy::HttpResponseSenderAsDone::~HttpResponseSenderAsDone() /home/runner/work/brpc/brpc/src/brpc/policy/http_rpc_protocol.cpp:742:7
    #13 0x7f6365a79dcf in brpc::policy::HttpResponseSenderAsDone::~HttpResponseSenderAsDone() /home/runner/work/brpc/brpc/src/brpc/policy/http_rpc_protocol.cpp:742:7
    #14 0x69c1a7 in brpc::ClosureGuard::~ClosureGuard() /home/runner/work/brpc/brpc/test/../src/brpc/closure_guard.h:39:20
    #15 0x66bc3e in (anonymous namespace)::ServerAlwaysFailReader::OnEndOfMessage(butil::Status const&) /home/runner/work/brpc/brpc/test/brpc_http_rpc_protocol_unittest.cpp:1075:5
    #16 0x7f63659a28d3 in brpc::HttpMessage::SetBodyReader(brpc::ProgressiveReader*) /home/runner/work/brpc/brpc/src/brpc/details/http_message.cpp:411:20
    #17 0x6aa3e5 in brpc::policy::HttpContext::ReadProgressiveAttachmentBy(brpc::ProgressiveReader*) /home/runner/work/brpc/brpc/test/../src/brpc/policy/http_rpc_protocol.h:118:16
    #18 0x7f636586f505 in brpc::Controller::ReadProgressiveAttachmentBy(brpc::ProgressiveReader*) /home/runner/work/brpc/brpc/src/brpc/controller.cpp:1490:18
    #19 0x66a23c in (anonymous namespace)::UploadServiceImpl::UploadFailed(google::protobuf::RpcController*, test::HttpRequest const*, test::HttpResponse*, google::protobuf::Closure*) /home/runner/work/brpc/brpc/test/brpc_http_rpc_protocol_unittest.cpp:1100:15
    #20 0x622242 in test::UploadService::CallMethod(google::protobuf::MethodDescriptor const*, google::protobuf::RpcController*, google::protobuf::Message const*, google::protobuf::Message*, google::protobuf::Closure*) /home/runner/work/brpc/brpc/test/echo.pb.cc:3291:7
    #21 0x7f6365a7575f in brpc::policy::ProcessHttpRequest(brpc::InputMessageBase*) /home/runner/work/brpc/brpc/src/brpc/policy/http_rpc_protocol.cpp:1610:21
    #22 0x7f63657cbfca in brpc::ProcessInputMessage(void*) /home/runner/work/brpc/brpc/src/brpc/input_messenger.cpp:177:5
    #23 0x7f636557b5ad in bthread::TaskGroup::task_runner(long) /home/runner/work/brpc/brpc/src/bthread/task_group.cpp:345:29
    #24 0x7f6365588040 in bthread_make_fcontext (libbrpc.dbg.so+0x567040)
previously allocated by thread T8 (brpc_wkr:0-5) here:
    #0 0x4eaccd in operator new(unsigned long, std::nothrow_t const&) (/home/runner/work/brpc/brpc/test/brpc_http_rpc_protocol_unittest+0x4eaccd)
    #1 0x7f6365a6ff3a in brpc::policy::ParseHttpMessage(butil::IOBuf*, brpc::Socket*, bool, void const*) /home/runner/work/brpc/brpc/src/brpc/policy/http_rpc_protocol.cpp:1115:21
    #2 0x7f63657cb2bc in brpc::InputMessenger::CutInputMessage(brpc::Socket*, unsigned long*, bool) /home/runner/work/brpc/brpc/src/brpc/input_messenger.cpp:149:30
    #3 0x7f63657cc579 in brpc::InputMessenger::ProcessNewMessage(brpc::Socket*, long, bool, unsigned long, unsigned long, brpc::InputMessenger::InputMessageClosure&) /home/runner/work/brpc/brpc/src/brpc/input_messenger.cpp:237:26
    #4 0x7f63657cdeea in brpc::InputMessenger::OnNewMessages(brpc::Socket*) /home/runner/work/brpc/brpc/src/brpc/input_messenger.cpp:392:62
    #5 0x7f63658fbb79 in brpc::Socket::ProcessEvent(void*) /home/runner/work/brpc/brpc/src/brpc/socket.cpp:1[186](https://github.com/chenBright/brpc/actions/runs/10552403816/job/29231164862?pr=11#step:7:187):5
    #6 0x7f636557b5ad in bthread::TaskGroup::task_runner(long) /home/runner/work/brpc/brpc/src/bthread/task_group.cpp:345:29
    #7 0x7f6365588040 in bthread_make_fcontext (libbrpc.dbg.so+0x567040)

服务端收到请求后的调用链路:cntl->ReadProgressiveAttachmentBy -> (HttpContext)_rpa->ReadProgressiveAttachmentBy -> HttpMessage::SetBodyReader -> r->OnReadOnePart 、 r->OnEndOfMessage -> brpc::~ClosureGuard,其中r->OnReadOnePart和r->OnEndOfMessage调用位置是:

https://github.com/apache/brpc/blob/2c6644be83d7b016693ab360832424f0069842cf/src/brpc/details/http_message.cpp#L407-L419

因为接收到的http包比较小,服务端一次性就收完并解析完了,所以最后只有cntl持有_rpa即HttpMessage。r->OnEndOfMessage即ServerAlwaysFailReader::OnEndOfMessage结束前,done_guard析构会回包给客户端,并回收cntl,_rpa(HttpMessage)也就被回收了,那后续在HttpMessage::SetBodyReader使用了成员变量_body_reader就会crash了。

https://github.com/apache/brpc/blob/2c6644be83d7b016693ab360832424f0069842cf/src/brpc/details/http_message.cpp#L384-L388

TangSiyang2001 commented 2 months ago

@chenBright 收到,如果不紧急,我空闲时修复一下