apache / brpc

brpc is an Industrial-grade RPC framework using C++ Language, which is often used in high performance system such as Search, Storage, Machine learning, Advertisement, Recommendation etc. "brpc" means "better RPC".
https://brpc.apache.org
Apache License 2.0
16.54k stars 3.97k forks source link

BRPC兼容GRPC stream #1589

Open KaneVV1 opened 3 years ago

KaneVV1 commented 3 years ago

Is your feature request related to a problem? (你需要的功能是否与某个问题有关?)

  stream是rpc框架使用中的常用功能,虽然brpc有streaming rpc,但是兼容grpc stream能给框架的这部分功能带来更大的泛用性。   经过百度内部Service Mesh实践,出于定制化需求方向的考虑,我们希望在proxyless模式下,brpc能够直连istio,逐渐减弱对envoy的依赖。istio下发配置使用的是双向grpc stream,所以我们需要完成brpc兼容grpc stream的适配。  主要需求如下:   1、兼容grpc steam的同步/异步或订阅推送式的客户端;   2、一如既往简单便捷的客户端API,不希望定制protobuf插件。  后续事项:*服务端API设计、直达底层h2协议的性能调优

Describe the solution you'd like (描述你期望的解决方法)

以grpc stream Demo的消息格式 https://github.com/grpc/grpc/blob/fd3bd70939fb4239639fbd26143ec416366e4157/examples/python/data_transmission/demo.proto 为例,可以通过以下API来对grpc stream服务端进行访问。

#include <gflags/gflags.h>
#include <butil/logging.h>
#include <butil/time.h>
#include <brpc/channel.h>
#include <brpc/stream_channel.h>
#include <brpc/grpc_stream.h>

#include "grpcdemo.pb.h"

DEFINE_string(server, "0.0.0.0:8000", "IP Address of server");
DEFINE_string(protocol, "h2:grpc", "Protocol type. Defined in protocol/baidu/rpc/options.proto");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); 
DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");

using namespace brpc;

int main(int argc, char* argv[]) {
    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);

    StreamChannel channel;

    StreamOptions options;
    options.protocol = FLAGS_protocol;
    options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
    options.max_retry = FLAGS_max_retry;
    if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
        LOG(ERROR) << "Fail to initialize channel";
        return -1;
    }

    demo::GRPCDemo_Stub stub(&channel);

    // 以下参照了grpc sync_stream的设计

    {
        // 客户端流模式(在一次调用中, 客户端可以多次向服务器传输数据, 但是服务器只能返回一次响应)
        // rpc ClientStreamingMethod (stream Request) returns (Response);
        demo::Request req;
        demo::Response res;
        Controller cntl;
        auto writer = CreateStreamVisitor<demo::Request, GrpcWriter>(
            &demo::GRPCDemo_Stub::ClientStreamingMethod, &stub, &res, &cntl);

        req.set_request_data("Hello!");
        writer->Write(req);

        req.set_request_data("World!");
        writer->Write(req);
        writer->WritesDone(); 

        writer->Finish();
        if (!cntl.Failed()) {
            LOG(INFO) << "ClientStreamingMethod wrote req, res=" << res.response_data();
        } else {
            LOG(WARNING) << "ClientStreamingMethod failed, error=" << cntl.ErrorText();
        }
    }

    {
        // 服务端流模式(在一次调用中, 客户端只能一次向服务器传输数据, 但是服务器可以多次返回响应)
        // rpc ServerStreamingMethod (Request) returns (stream Response);
        demo::Request req;
        demo::Response res;
        brpc::Controller cntl;
        auto reader = CreateStreamVisitor<demo::Response, GrpcReader>(
            &demo::GRPCDemo_Stub::ServerStreamingMethod, &stub, &req, &cntl);
        while (reader->Read(&res)) {
            LOG(INFO) << "ServerStreamingMethod read res, res=" << res.response_data();
        }
        reader->Finish();
        if (cntl.Failed()) {
            LOG(WARNING) << "ServerStreamingMethod failed, error=" << cntl.ErrorText();
        }
    }

    {
        // 双向流模式 (在一次调用中, 客户端和服务器都可以向对方多次收发数据)
        // rpc BidirectionalStreamingMethod (stream Request) returns (stream Response);
        demo::Request req;
        demo::Response res;
        brpc::Controller cntl;
        auto stream = CreateStreamVisitor<demo::Request, demo::Response, GrpcReaderWriter>(
            &demo::GRPCDemo_Stub::BidirectionalStreamingMethod, &stub, &cntl);
        req.set_request_data("Hello!");
        std::thread writer([stream]() {
            for (int i = 0; i < 3; ++i) {
                stream->Write(req);
            }
            // 全双工每个方向需要单独关闭
            // 阻塞到以上所有写入操作完成,并关闭写入
            // 业务最好等全部写完
            stream->WritesDone();
            if (cntl.Failed()) {
                LOG(WARNING) << "BidirectionalStreamingMethod writing failed, error=" << cntl.ErrorText();
            }
        });

        while (stream->Read(&res)) {
            LOG(INFO) << "BidirectionalStreamingMethod read res, res=" << res.response_data();
        }
        writer.join();
        // 阻塞到两端都写完读完,关闭该流
        stream->Finish();
        if (cntl.Failed()) {
            LOG(WARNING) << "BidirectionalStreamingMethod failed, error=" << cntl.ErrorText();
        }
    }

    LOG(INFO) << "StreamEchoClient is going to quit";
    return 0;
}

具体设计 20220818更新设计概图 image TBD

Describe alternatives you've considered (描述你想到的折衷方案) 简单地直接引入GRPC依赖

Additional context/screenshots (更多上下文/截图)

chenzhangyi commented 3 years ago

grpc stream最大的问题就是没有异步接口。 另外上面这个demo中,好像看不到错误处理的部分? stream不能保证是可靠的。

KaneVV1 commented 3 years ago

grpc stream最大的问题就是没有异步接口。 另外上面这个demo中,好像看不到错误处理的部分? stream不能保证是可靠的。

异步接口后面会有计划,至于错误处理,暂时打算放在controller里

chenzhangyi commented 3 years ago

接口好像有点多。 WritesDone和Finish是什么关系?可不可以合为一个。 writer->Finish()和stream->Finisher()又是什么关系? 没发送完毕,业务要中途退出是要如何处理(writer/reader端都可能发生)? 本身rpc接口中的done这里要如何处理? 这个case并没有看到。 能已echo为例子写一个实际的包含错误处理的例子么。

KaneVV1 commented 3 years ago

接口好像有点多。 WritesDone和Finish是什么关系?可不可以合为一个。 writer->Finish()和stream->Finisher()又是什么关系? 没发送完毕,业务要中途退出是要如何处理(writer/reader端都可能发生)? 本身rpc接口中的done这里要如何处理? 这个case并没有看到。 能已echo为例子写一个实际的包含错误处理的例子么。

已更新case 1、2 最好不要合一个,WritesDone用来保证发送端数据完整性,Finish来关闭整个流; 3、done还未考虑

chenzhangyi commented 3 years ago

接口好像有点多。 WritesDone和Finish是什么关系?可不可以合为一个。 writer->Finish()和stream->Finisher()又是什么关系? 没发送完毕,业务要中途退出是要如何处理(writer/reader端都可能发生)? 本身rpc接口中的done这里要如何处理? 这个case并没有看到。 能已echo为例子写一个实际的包含错误处理的例子么。

已更新case 1、2 最好不要合一个,WritesDone用来保证发送端数据完整性,Finish来关闭整个流; 3、done还未考虑

调用done->Run()也可以关闭整个流? 看起来一件事情似乎N个接口都可以做,这个可能已经不是很好的迹象了。

参考tcp的接口,只有close, 没有reset. 甚至连half close都没提供。因为对于调用者来说,这几个接口一般都是同时调用的。 这可能已经意味着只需要保留一个。除非能找到常见的case这两个需要分别调用。否则最好是只保留一个。有遇到特殊情况再提供能分开调用的函数。比如默认就是Close(). 有特殊case再提供CloseRead(), CloseWrite().

另外一个问题, 这个stream接口双工/单工的是如何设置的?

KaneVV1 commented 2 years ago

接口好像有点多。 WritesDone和Finish是什么关系?可不可以合为一个。 writer->Finish()和stream->Finisher()又是什么关系? 没发送完毕,业务要中途退出是要如何处理(writer/reader端都可能发生)? 本身rpc接口中的done这里要如何处理? 这个case并没有看到。 能已echo为例子写一个实际的包含错误处理的例子么。

已更新case 1、2 最好不要合一个,WritesDone用来保证发送端数据完整性,Finish来关闭整个流; 3、done还未考虑

调用done->Run()也可以关闭整个流? 看起来一件事情似乎N个接口都可以做,这个可能已经不是很好的迹象了。

参考tcp的接口,只有close, 没有reset. 甚至连half close都没提供。因为对于调用者来说,这几个接口一般都是同时调用的。 这可能已经意味着只需要保留一个。除非能找到常见的case这两个需要分别调用。否则最好是只保留一个。有遇到特殊情况再提供能分开调用的函数。比如默认就是Close(). 有特殊case再提供CloseRead(), CloseWrite().

另外一个问题, 这个stream接口双工/单工的是如何设置的?

目前我们正在开发中,对于超出我们内部的、更加普遍的应用场景,需要更加详细的设计,到时候再与您商讨。

liyichao commented 2 years ago

这个目前状态如何?

KaneVV1 commented 2 years ago

这个目前状态如何?

正在开发

liubo-bj commented 2 years ago

请问现在的状态是啥,计划什么时候完成

egolearner commented 1 year ago

Hi @KaneVV1 ,请问现在什么状态了?