sofastack / sofa-rpc

SOFARPC is a high-performance, high-extensibility, production-level Java RPC framework.
https://www.sofastack.tech/sofa-rpc/docs/Home
Apache License 2.0
3.81k stars 1.17k forks source link

Triple stream support (issue#1332) #1360

Closed namelessssssssssss closed 1 month ago

namelessssssssssss commented 10 months ago

Motivation:

This PR can provides stream transport support for triple protocol, including server streaming and bidirectional/client streaming.

Modification:

Change log (Chinese) :

概述

Stream 方式是一种异步的流式处理方式,可以在数据传输过程中逐个处理数据,避免一次性传输大量数据造成的性能问题。服务端Stream 是指服务端在处理请求时,将数据分成多个部分逐个返回给客户端的过程;客户端 Stream 是指客户端在请求服务器时,将请求参数分成多个部分逐个发送给服务器的过程。Stream 方式可以让我们在处理大量数据时更高效地使用资源,提高系统的性能和响应速度。

业务场景

流式传输适配下列业务场景:

实现的流传输应具有以下语义保证:

流传输模型

对于流式传输,我们可以将Stream分为四种类型 (借鉴于gRPC):

Unary,传统请求-响应模型:客户端一次请求,服务端一次响应

ClientStream,客户端流:客户端多次请求,服务端最终返回一次响应,服务端接收请求期间可以返回已完成/取消信号来停止客户端继续发送请求,或者在客户端请求完成后返回响应。

ServerStream,服务端流:客户端单次请求,服务端返回多次响应,客户端接收请求期间可以返回已完成/取消信号来停止接受服务端的响应,或者等待服务端发送完成相应信号。

BidirectionalStream,双向流:客户端发送一次请求后,客户端-服务端均可相互多次乱序发送请求,直到任意一方发送已完成/取消信号。

流传输接口定义

我们先从RPC调用的最上层开始分析。首先,我们需要定义客户端/服务端用于接收/发送流式请求的接口,该接口定义了如何发送、接收、处理数据流,以及数据流的异常、结束操作

考虑到目前传输层的实现,此处可以考虑直接使用 gRPC 中的 StreamObserver,这样对于 Triple 协议的传输层实现来说比较方便(TripleClientInvoker 直接依赖 gRPC 进行传输),但缺点是可能导致其它未来可能支持流传输的协议同时耦合了gRPC的API。

更好的方案是使用自定义的 StreamObserver(此处先叫做StreamHandler),将其和gRPC解耦,在 Triple 传输层通过适配器转换为gRPC StreamObserver。此处使用自定义的StreamHandler。

我们为该接口定义以下方法,实际和gRPC中的StreamObserver一致:

//泛型为消息类型
public interface StreamHandler<T> {
   /**
    * 接收者通过实现该方法,定义单条信息的处理逻辑
    * 发送者则通过调用该方法发送一条信息
    */
    void onMessage(T message);

    /**
    * 接收者通过该方法定义信息发送完成后的处理逻辑
    * 发送者通过调用该方法告知接收者所有信息发送完毕
    */
    void onFinish();

    /**
    * 接收者通过该方法定义处理某条消息发生异常时的处理逻辑 (onMessage方法抛出异常时)
    * 发送者不应调用该方法
    */
    void onException(Throwable throwable);

}

全局处理

1,标记请求类型

首先是识别方法调用类型,并为 SofaRequest 打上流传输的标记。

对于客户端来说,SofaRequest在客户端代理中创建。其中标记传输类型的字段在DefaultClientProxyInvoker中的 decorateRequest方法中设置(对于泛型调用)。

//DefaultClientProxyInvoker 
@Override
    protected void decorateRequest(SofaRequest request) {
        // 公共的设置
        super.decorateRequest(request);

        // 缓存是为了加快速度
        request.setTargetServiceUniqueName(serviceName);
        request.setSerializeType(serializeType == null ? 0 : serializeType);

        if (!consumerConfig.isGeneric()) {
            // 找到调用类型, generic的时候类型在filter里进行判断
             request.setInvokeType(consumerConfig.getMethodInvokeType(request));
        }
        ......
    }

setInvokeType方法最终调用ConsumerConfig中的getMethodInvokeType,尝试从接口配置中获取已缓存的调用方式,否则使用默认值(一元调用)。可以将判断调用类型的操作添加在其中的 getMethodInvokeType 中,这样也可以将方法调用模式缓存,防止每次调用都要进行繁琐的判断操作。

修改后的方法:

  //ConsumerConfig
  public String getMethodInvokeType(SofaRequest sofaRequest) {
        String methodName = sofaRequest.getMethodName();
        String invokeType = (String) getMethodConfigValue(methodName, RpcConstants.CONFIG_KEY_INVOKE_TYPE,null);

        if(invokeType == null) {
            invokeType = getAndCacheCallType(sofaRequest);
        }

        return invokeType;
    }
     /**
     * Get and cache the call type of certain method
     * @param request RPC request
     * @return request call type
     */
    public String getAndCacheCallType(SofaRequest request){
         Method method  = request.getMethod();
         String callType = MethodConfig.mapStreamType(
                 method,
                 (String) getMethodConfigValue(request.getMethodName(), RpcConstants.CONFIG_KEY_INVOKE_TYPE, getInvokeType())
         );
         //Method level config
         updateAttribute(buildMethodConfigKey(request,RpcConstants.CONFIG_KEY_INVOKE_TYPE),callType,true);
         return callType;
    }
//MethodConfig
/**
     * Gets the stream call type of certain method
     * @param method the method
     * @return call type,server/client/bidirectional stream or default value. If not mapped to any stream call type, use the default value
     */
    public static String mapStreamType(Method method, String defaultValue){
        Class<?>[] paramClasses = method.getParameterTypes();
        Class<?> returnClass = method.getReturnType();

        int paramLen = paramClasses.length;
        String callType;

        //BidirectionalStream
        if(paramLen>0 && paramClasses[0].isAssignableFrom(StreamHandler.class) && returnClass.isAssignableFrom(StreamHandler.class)){

            if(paramLen > 1){
                throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE,"Bidirectional stream method parameters can be only one StreamHandler.");
            }

            callType = RpcConstants.INVOKER_TYPE_BI_STREAMING;
        }
        //ServerStream
        else if ( paramLen>1 && paramClasses[1].isAssignableFrom(StreamHandler.class) && returnClass.isAssignableFrom(StreamHandler.class)){
            callType = RpcConstants.INVOKER_TYPE_SERVER_STREAMING;
        }
        //ClientStream
        else if (returnClass.isAssignableFrom(StreamHandler.class) && Arrays.stream(paramClasses).noneMatch(clazz -> clazz.isAssignableFrom(StreamHandler.class))) {
            callType = RpcConstants.INVOKER_TYPE_CLIENT_STREAMING;
        }
        else if (returnClass.isAssignableFrom(StreamHandler.class) || Arrays.stream(paramClasses).anyMatch(clazz -> clazz.isAssignableFrom(StreamHandler.class))) {
            throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE, "StreamHandler can only appear at the specified location of parameter. Please check related docs.");
        }
        //Other call types
        else {
            callType = defaultValue;
        }

        return callType;
    }

主要完成以下三个操作:

此处将调用方式缓存到了MethodConfig中,因为InterfaceConfig使用的是UnmodifiableMap,在其初始化完成后已不能再添加新的信息。由于 MethodConfig 本身没有保存 Method 引用,且不会默认创建,使得获取方法参数及返回值的具体类型有些困难,需要在实际请求时通过 SofaRequest 拿到具体的参数 Class,才可较简单的判断方法参数及返回值中StreamHandler的出现情况,判断请求类型。

2,修改AbstractCluster.doSendMsg

在该方法中通过获取SofaRequest的RequestType,判断是使用同步、异步、回调还是其它调用方法。对于流式调用,需要在其中添加新请求类型的处理逻辑。

 protected SofaResponse doSendMsg(ProviderInfo providerInfo, ClientTransport transport,sofaRequest request) throws SofaRpcException {
   ...
        try {
   ...
            // 同步调用
            if (RpcConstants.INVOKER_TYPE_SYNC.equals(invokeType)) {
    ...
            }
            // 单向调用
            else if (RpcConstants.INVOKER_TYPE_ONEWAY.equals(invokeType)) {
    ...
            }
            // Callback调用
            else if (RpcConstants.INVOKER_TYPE_CALLBACK.equals(invokeType)) {
    ...
            }
            // Future调用
            else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
   ...
            }
            // 流式调用
            else if (RpcConstants.INVOKER_TYPE_STREAM_CLIENT.equals(invokeType)
                    || RpcConstants.INVOKER_TYPE_STREAM_BI.equals(invokeType)
                    || RpcConstants.INVOKER_TYPE_STREAM_SERVER.equals(invokeType)) { 
                response = transport.syncSend(request, Integer.MAX_VALUE);
            }
            else {
                throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Unknown invoke type:" + invokeType);
            } 
            return response;
        } catch (SofaRpcException e) {
            throw e;
        } catch (Throwable e) { // 客户端其它异常
            throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, e);
        }
    }

根据流式传输方法的定义,如果客户端希望通过Stream向服务端再次发送信息,它需要通过调用RPC方法返回的StreamHandler中的onMessage方法来向服务端发送下一条信息。

消费者对流传输方法的第一次调用实际是向提供者注册传输会话,且需要依赖返回的StreamHandler向提供者发送后续信息(且需要确保StreamHandler已初始化完毕,保证消费者直接使用其发送消息不会出错),此处选择同步调用。

协议处理

Triple协议

SofaRPC 的 Triple 协议传输层实现目前直接依赖 gRPC 进行。

泛型调用

当 TripleClientInvoker 中 useGeneric 字段为 false 时,表示消费者调用的服务存在IDL及对应的Stub,可以直接通过生成的Stub进行调用。当 useGeneric 为 true 时,将在运行时动态指定调用的服务及方法,借助底层的泛型服务完成传输。

因此,Triple协议的修改分为两个方面:非泛型调用时,通过服务接口生成的stub进行流式调用;以及泛型调用时,通过预生成的GenericService stub发起流式调用。

IDL修改

对于泛型调用,Triple协议通过将已序列化完成的请求统一封装为 Request,通过预生成的gRPC stub完成传输过程。

因此需要修改transformer.proto, 添加流式调用方法的定义。 考虑客户端流和双向流使用可能使用相同的调用方法,此处没有单独定义客户端流的传输方法。

syntax = "proto3";
option java_multiple_files = true;
option java_package = "triple";
option java_outer_classname = "GenericProto";

service GenericService {
    rpc generic (Request) returns (Response) {}

    rpc genericBiStream (stream Request) returns (stream Response){}

    rpc genericServerStream(Request) returns (stream Response){}
}

message Request {
    string serializeType = 1;
    repeated bytes  args = 2;
    repeated string argTypes = 3;
}

message Response {
    string serializeType = 1;
    bytes  data = 2;
    string type = 3;
}

之后,服务端实现新的泛型服务方法

public class GenericServiceImpl extends SofaGenericServiceTriple.GenericServiceImplBase {
//双向流调用实现
   @Override
    public StreamObserver<Request> genericBiStream(StreamObserver<Response> responseObserver) {
        Method serviceMethod = getBidirectionalStreamRequestMethod();
        //通过上下文创建请求
        SofaRequest sofaRequest = TracingContextKey.getKeySofaRequest().get(Context.current());

        if (serviceMethod == null) {
            throw new SofaRpcException(RpcErrorType.SERVER_NOT_FOUND_INVOKER, "Cannot find invoke method " +
                sofaRequest.getMethodName());
        }
        String methodName = serviceMethod.getName();
        try {
            ResponseSerializeStreamHandler serverResponseHandler = new ResponseSerializeStreamHandler(responseObserver,
                getSerialization());

            setBidirectionalStreamRequestParams(sofaRequest, serviceMethod, serverResponseHandler);

            SofaResponse sofaResponse = invoker.invoke(sofaRequest);

            StreamHandler<Object> clientHandler = (StreamHandler<Object>) sofaResponse.getAppResponse();

            return new StreamObserver<Request>() {
                volatile Serializer serializer = null;

                volatile Class<?>[] argTypes   = null;

                @Override
                public void onNext(Request request) {
                    checkInitialize(request);
                    Object message = getInvokeArgs(request, argTypes, serializer, false)[0];
                    clientHandler.onMessage(message);
                }

                private void checkInitialize(Request request) {
                    if (serializer == null && argTypes == null) {
                        synchronized (this) {
                            if (serializer == null && argTypes == null) {
                                serializer = SerializerFactory.getSerializer(request.getSerializeType());
                                argTypes = getArgTypes(request, false);
                            }
                        }
                    }
                }

                @Override
                public void onError(Throwable t) {
                    clientHandler.onException(t);
                }

                @Override
                public void onCompleted() {
                    clientHandler.onFinish();
                }
            };
        } catch (Exception e) {
            LOGGER.error("Invoke " + methodName + " error:", e);
            throw new SofaRpcRuntimeException(e);
        } finally {
            Thread.currentThread().setContextClassLoader(Thread.currentThread().getContextClassLoader());
        }
    }

以上实现的主要难点在于,流调用方法传入及返回的均为StreamHandler,这意味着在实际请求到来之前无法得知具体的请求类型。因此以上的实现中,序列化及方法参数信息是在接收到第一次请求之后才被初始化的。

Triple传输层修改

*由于实际上三种流传输的底层流程差别不大,以下的修改示例均为双向流(BidirectionalStream)。

客户端

以下为对TripleClientInvoker相关的修改。

重构invoke方法,根据callType区分为三种调用流程

    @Override
    public SofaResponse invoke(SofaRequest sofaRequest, int timeout)
            throws Exception {

        MethodDescriptor.MethodType callType = mapCallType(sofaRequest);

        if(!useGeneric){
            //非泛型,根据生成的Stub调用
            return stubCall(sofaRequest,timeout);
        } else if (callType.equals(MethodDescriptor.MethodType.UNARY)) {
            return unaryCall(sofaRequest, timeout);
        } else {
            return streamCall(sofaRequest, timeout, callType);
        }
    }

然后,根据具体调用方式选择具体调用逻辑

private SofaResponse streamCall(SofaRequest sofaRequest, int timeout, boolean useGeneric,MethodDescriptor.MethodType callType) {
         switch (callType){
             case BIDI_STREAMING:
                 return binaryCall(sofaRequest,timeout,useGeneric);
             case CLIENT_STREAMING:
                 return clientStreamCall(sofaRequest,timeout,useGeneric);
             case SERVER_STREAMING:
                 return serverStreamCall(sofaRequest,timeout,useGeneric);
             default:
                 throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE,"Unknown stream call type:"+callType);
         }
    }

实现双向流传输

private SofaResponse binaryStreamCall(SofaRequest sofaRequest, int timeout) {
        StreamHandler streamHandler = (StreamHandler) sofaRequest.getMethodArgs()[0];

        MethodDescriptor<Request, Response> methodDescriptor = getMethodDescriptor(sofaRequest);
        ClientCall<Request, Response> call = channel.newCall(methodDescriptor, buildCustomCallOptions(sofaRequest, timeout));

        StreamObserver<Request> observer = ClientCalls.asyncBidiStreamingCall(
                call,
                new ClientStreamObserverAdapter(
                        streamHandler,
                        sofaRequest.getSerializeType()
                )
        );
        StreamHandler<Request> handler = new StreamHandler() {
            @Override
            public void onMessage(Object message) {
                SofaRequest request = MessageBuilder.copyEmptyRequest(sofaRequest);
                Object[] args = new Object[]{message};
                request.setMethodArgs(args);
                request.setMethodArgSigs(rebuildTrueRequestArgSigs(args));
                Request req = getRequest(request, serialization, serializer, 0);
                observer.onNext(req);
            }

            @Override
            public void onFinish() {
                observer.onCompleted();
            }

            @Override
            public void onException(Throwable throwable) {
                observer.onError(TripleExceptionUtils.asStatusRuntimeException(throwable));
            }
        };
        SofaResponse sofaResponse = new SofaResponse();
        sofaResponse.setAppResponse(handler);
        return sofaResponse;
    }

为MethodDescriptor设置新的调用类型

    private MethodDescriptor<Request,Response> getMethodDescriptor(SofaRequest sofaRequest) {
        String serviceName = sofaRequest.getInterfaceName();
        String methodName = sofaRequest.getMethodName();
        MethodDescriptor.Marshaller<?> requestMarshaller = ProtoUtils.marshaller(Request.getDefaultInstance());
        MethodDescriptor.Marshaller<?> responseMarshaller = ProtoUtils.marshaller(Response.getDefaultInstance());
        String fullMethodName = generateFullMethodName(serviceName, methodName);

        MethodDescriptor.Builder builder = MethodDescriptor
                .newBuilder()
                .setFullMethodName(fullMethodName)
                .setSampledToLocalTracing(true)
                .setRequestMarshaller((MethodDescriptor.Marshaller<Object>) requestMarshaller)
                .setResponseMarshaller((MethodDescriptor.Marshaller<Object>) responseMarshaller);

        MethodDescriptor.MethodType callType = SofaProtoUtils.mapGrpcCallType(sofaRequest.getInvokeType());
        builder.setType(callType);
        return builder.build();
    }

添加工具方法:

//SofaProtoUtils
    public static MethodDescriptor.MethodType mapGrpcCallType(String callType){
        switch (callType){
            case INVOKER_TYPE_ONEWAY:
            case INVOKER_TYPE_FUTURE:
            case INVOKER_TYPE_CALLBACK:
            case INVOKER_TYPE_SYNC:
                return MethodDescriptor.MethodType.UNARY;
            case INVOKER_TYPE_BI_STREAMING:
                return MethodDescriptor.MethodType.BIDI_STREAMING;
            case INVOKER_TYPE_CLIENT_STREAMING:
                return MethodDescriptor.MethodType.CLIENT_STREAMING;
            case INVOKER_TYPE_SERVER_STREAMING:
                return MethodDescriptor.MethodType.SERVER_STREAMING;
            default:
                throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE, "Unsupported invoke type:" + callType);
        }
    }

服务端

方法绑定问题

对于服务端,泛型调用的入口是 GenericService 的实现,即 GenericSerivceImpl。

在为泛型服务的IDL中添加流式方法的定义并重新编译后,发现一元调用时(UNARY)不能正确的选择GenericServiceImple 的 generic 方法,而选择调用了 genericBiStream 方法。

断点调试可以发现,底层 ServerMethodDefinition 和 CallHandler 的绑定错误,使得方法选择了ID错误的CallHandler,从而调用了错误的方法。ServerMethodDefinition 在 ServerImpl.runInternal() 中通过 registry.lookupMethod(methodName)获取。实际调用的是 MutableHandlerRegistry,通过其中的 services 获取 ServerServiceDefinition。而 service 中的 ServerServiceDefinition 由 TripleServer.registerProcessor 放入,自此回到了 SofaRPC 自己的实现。

//TripleServer.registerProcessor
...
ServerServiceDefinition serviceDefinition = getServerServiceDefinition(providerConfig, uniqueIdInvoker);
            this.serviceInfo.put(providerConfig, serviceDefinition);
...

分析数据流,可以发现将客户端实际服务方法和泛型服务方法绑定起来的操作发生在TripleServer.buildSofaServiceDef方法中。该流程实际在服务提供者启动,导出服务接口时进行。

private ServerServiceDefinition buildSofaServiceDef(GenericServiceImpl genericService,ProviderConfig providerConfig) {
        ServerServiceDefinition templateDefinition = genericService.bindService();
        ServerCallHandler<Request, Response> templateHandler = (ServerCallHandler<Request, Response>) templateDefinition
            .getMethods().iterator().next().getServerCallHandler(); //这里只拿到了泛型服务中的第一个方法,没有适配新的流调用方法。
        List<MethodDescriptor<Request, Response>> methodDescriptor = getMethodDescriptor(providerConfig);
        List<ServerMethodDefinition<Request, Response>> methodDefs = getMethodDefinitions(templateHandler,//此处使用的 TemplateHandler 应随方法调用方式变化
methodDescriptor);
        ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(getServiceDescriptor( //应该为将实际服务与泛型服务中的特定方法绑定起来
            templateDefinition, providerConfig, methodDescriptor));
        for (ServerMethodDefinition<Request, Response> methodDef : methodDefs) {
            builder.addMethod(methodDef);
        }
        return builder.build();
    }

修改后的实现:

   private ServerServiceDefinition buildSofaServiceDef(GenericServiceImpl genericService,
                                                        ProviderConfig providerConfig) {
        ServerServiceDefinition templateDefinition = genericService.bindService();
        List<MethodDescriptor<Request, Response>> methodDescriptor = getMethodDescriptor(providerConfig);//拿实际方法的 Descriptor
        List<ServerMethodDefinition<Request, Response>> methodDefs = mapMethodHandler(templateDefinition,methodDescriptor);
        ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(getServiceDescriptor( 
            templateDefinition, providerConfig, methodDescriptor));
        for (ServerMethodDefinition<Request, Response> methodDef : methodDefs) {
            builder.addMethod(methodDef);
        }
        return builder.build();
    }
//将实际服务与泛型服务中的特定方法绑定起来
   private List<ServerMethodDefinition<Request,Response>>createMethodDefainition(ServerServiceDefinition geneticServiceDefinition, List<MethodDescriptor<Request, Response>> serviceMethods){
            Collection<ServerMethodDefinition<?, ?>> genericServiceMethods = geneticServiceDefinition.getMethods();
            List<ServerMethodDefinition<Request,Response>> serverMethodDefinitions = new ArrayList<>();
            //Map ture service method to certain generic service method.
            for (ServerMethodDefinition<?,?> genericMethods : genericServiceMethods){
                for(MethodDescriptor<Request,Response> methodDescriptor : serviceMethods){

                    if(methodDescriptor.getType().equals(genericMethods.getMethodDescriptor().getType())){

                        ServerMethodDefinition<Request,Response> genericMeth = (ServerMethodDefinition<Request, Response>) genericMethods;

                        serverMethodDefinitions.add(
                                ServerMethodDefinition.create(methodDescriptor, genericMeth.getServerCallHandler())
                        );
                    }
                }
            }
            return serverMethodDefinitions;
    }

这样 ServerMethodDefinition 就和 callHandler 正确的绑定起来了。

方法定位问题

以上方法传入的 ProviderConfig 提供了实际服务接口的配置信息,包括这些方法的调用方式。但是调用方式默认被设置为UNARY。在上层需要附加一部分根据接口方法参数判断调用类型的逻辑。

//TripleServer
private List<MethodDescriptor<Request, Response>> getMethodDescriptor(ProviderConfig providerConfig) {
...
            MethodDescriptor<Request, Response> methodDescriptor = MethodDescriptor.<Request, Response>newBuilder()
                    .setType(MethodDescriptor.MethodType.UNARY) //默认固定为一元调用
                    .setFullMethodName(generateFullMethodName(providerConfig.getInterfaceId(), name))
                    .setSampledToLocalTracing(true)
                    .setRequestMarshaller(ProtoUtils.marshaller(
                            Request.getDefaultInstance()))
                    .setResponseMarshaller(ProtoUtils.marshaller(
                            Response.getDefaultInstance()))
                    .build();
            result.add(methodDescriptor);
...
        return result;
    }

修改后:

    private List<MethodDescriptor<Request, Response>> getMethodDescriptor(ProviderConfig providerConfig) {
        List<MethodDescriptor<Request, Response>> result = new ArrayList<>();
        Set<String> methodNames = SofaProtoUtils.getMethodNames(providerConfig.getInterfaceId());

        for (String name : methodNames) {
            //根据方法名获取调用模式
            MethodDescriptor.MethodType methodType = SofaProtoUtils.mapGrpcCallType(providerConfig.getMethodCallType(name));
            MethodDescriptor<Request, Response> methodDescriptor = MethodDescriptor.<Request, Response>newBuilder()
                    .setType(methodType)
                    .setFullMethodName(generateFullMethodName(providerConfig.getInterfaceId(), name))
                    .setSampledToLocalTracing(true)
                    .setRequestMarshaller(ProtoUtils.marshaller(
                            Request.getDefaultInstance()))
                    .setResponseMarshaller(ProtoUtils.marshaller(
                            Response.getDefaultInstance()))
                    .build();
            result.add(methodDescriptor);
        }
        return result;
    }

在接口配置(AbstractInterfaceConfig)中提供了按Class匹配并缓存方法调用模式的方法:

//AbstractInterfaceConfig
...
    /**
     * MethodCallType ( Map<method.getName(),callType> )
     */
    protected transient volatile Map<String,String> methodCallType = null;
...
     /**
     * Cache the call type of interface methods
     */
    protected void loadMethodCallType(Class<?> interfaceClass){
        Method[] methods =  interfaceClass.getDeclaredMethods();
        this.methodCallType = new ConcurrentHashMap<>();
        for(Method method :methods) {
            //此处调用的是全局处理中提到的 MethodConfig 的新方法,根据方法参数匹配调用方法
 methodCallType.put(method.getName(),MethodConfig.mapStreamType(method,RpcConstants.INVOKER_TYPE_SYNC));
        }
    }

    public String getMethodCallType(String methodName) {
        return methodCallType.get(methodName);
    }

*写在 AbstractInterfaceConfig 是考虑消费者是否能复用这部分逻辑

在 ProviderConfig 初始化时,设置服务引用时会尝试匹配服务方法的具体调用方式,并缓存:

//ProviderConfig
    /**
     * Sets ref.
     *
     * @param ref the ref
     * @return the ref
     */
    public ProviderConfig<T> setRef(T ref) {
        this.ref = ref;
        //加载方法调用方式、缓存
        loadMethodCallType(ref.getClass());
        return this;
    }

流信息序列化问题

对于双向流和客户端流,接口声明时的入参和返回值均仅为 StreamHandler。这导致无论是客户端请求和服务端响应都无法在调用开始就得知具体的消息类型。因此获取、缓存具体消息类型、序列化器的操作需要延迟在客户端/服务端第一次得到具体请求时再进行。

客户端向服务端发送消息使用的StreamHandler适配器,TripleClientInvoker中的匿名实现,将请求序列化为Request:

   StreamHandler<Request> handler = new StreamHandler() {
                    @Override
                    public void onMessage(Object message) {
                        SofaRequest request = MessageBuilder.copyEmptyRequest(sofaRequest);
                        Object[] args = new Object[]{message};
                        request.setMethodArgs(args);
                        request.setMethodArgSigs(rebuildTrueRequestArgSigs(args));
                        Request req = getRequest(request,serialization,serializer);
                        observer.onNext(req);
                    }

                    @Override
                    public void onFinish() {
                        observer.onCompleted();
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        observer.onError(throwable);
                    }
                };

客户端处理服务端响应使用的StreamHandler适配器,将Response反序列化为指定类型:

public class ClientStreamObserverAdapter implements StreamObserver<triple.Response> {

    private final StreamHandler<Object> streamHandler;

    private final Serializer serializer;

    private volatile Class<?> returnType;

    public ClientStreamObserverAdapter(StreamHandler<Object> streamHandler, byte serializeType) {
        this.streamHandler = streamHandler;
        this.serializer = SerializerFactory.getSerializer(serializeType);
    }

    @Override
    public void onNext(triple.Response response) {
        byte[] responseDate = response.getData().toByteArray();
        Object appResponse = null;
        String returnTypeName = response.getType();
        if (responseDate != null && responseDate.length > 0) {
            if(returnType == null && !returnTypeName.isEmpty()) {
                try {
                    returnType = Class.forName(returnTypeName);
                }catch (ClassNotFoundException e){
                    throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE,"Can not find return type :"+returnType);
                }
            }
            appResponse = serializer.decode(new ByteArrayWrapperByteBuf(responseDate), returnType, null);
        }

        streamHandler.onMessage(appResponse);
    }

    @Override
    public void onError(Throwable t) {
        streamHandler.onException(t);
    }

    @Override
    public void onCompleted() {
        streamHandler.onFinish();
    }
}

服务端发送响应时使用的StreamHandler适配器,将原始响应序列化为Response:

public class ResponseSerializeStreamHandler<T> implements StreamHandler<T> {

    private final StreamObserver<triple.Response> streamObserver;

    private Serializer serializer;

    private String serializeType;

    public ResponseSerializeStreamHandler(StreamObserver<triple.Response> streamObserver,String serializeType) {
        this.streamObserver = streamObserver;
        serializer = SerializerFactory.getSerializer(serializeType);
        this.serializeType = serializeType;
    }

    @Override
    public void onMessage(T message) {
        Response.Builder builder = Response.newBuilder();
        builder.setType(message.getClass().getName());
        builder.setSerializeType(serializeType);
        builder.setData(ByteString.copyFrom(serializer.encode(message, null).array()));

        streamObserver.onNext(builder.build());
    }

    @Override
    public void onFinish() {
        streamObserver.onCompleted();
    }

    @Override
    public void onException(Throwable throwable) {
        streamObserver.onError(throwable);
    }

}

服务端处理客户端请求使用的StreamHandler,TripleClientInvoker 中的匿名实现,将请求反序列化为指定类型:

return new StreamObserver<Request>() {
                volatile Serializer serializer = null;

                volatile Class<?>[] argTypes = null;

                @Override
                public void onNext(Request request) {
                    checkInitialize(request);
                    Object message = getInvokeArgs(request, argTypes, serializer)[0];
                    clientHandler.onMessage(message);
                }

                private void checkInitialize(Request request){
                    if (serializer == null && argTypes == null) {
                        synchronized (this) {
                            if (serializer == null && argTypes == null) {
                                serializer = SerializerFactory.getSerializer(request.getSerializeType());
                                argTypes = getArgTypes(request);
                            }
                        }
                    }
                }

                @Override
                public void onError(Throwable t) {
                    clientHandler.onException(t);
                }

                @Override
                public void onCompleted() {
                    clientHandler.onFinish();
                }
            };

Summary by CodeRabbit

sofastack-cla[bot] commented 10 months ago

Hi @namelessssssssssss, welcome to SOFAStack community, Please sign Contributor License Agreement!

After you signed CLA, we will automatically sync the status of this pull request in 3 minutes.

namelessssssssssss commented 10 months ago

@EvenLjj PTAL :)

codecov[bot] commented 10 months ago

Codecov Report

Attention: Patch coverage is 83.66013% with 50 lines in your changes are missing coverage. Please review.

Project coverage is 72.19%. Comparing base (e67ea54) to head (df27427). Report is 8 commits behind head on master.

:exclamation: Current head df27427 differs from pull request most recent head c7435df. Consider uploading reports for the commit c7435df to get more accurate results

Files Patch % Lines
...pay/sofa/rpc/server/triple/GenericServiceImpl.java 75.51% 18 Missing and 6 partials :warning:
...sofa/rpc/transport/triple/TripleClientInvoker.java 89.41% 5 Missing and 4 partials :warning:
.../java/com/alipay/sofa/rpc/config/MethodConfig.java 61.53% 2 Missing and 3 partials :warning:
...age/triple/stream/ClientStreamObserverAdapter.java 80.00% 2 Missing and 2 partials :warning:
...java/com/alipay/sofa/rpc/utils/SofaProtoUtils.java 50.00% 2 Missing and 1 partial :warning:
...om/alipay/sofa/rpc/utils/TripleExceptionUtils.java 25.00% 2 Missing and 1 partial :warning:
...va/com/alipay/sofa/rpc/client/AbstractCluster.java 50.00% 0 Missing and 2 partials :warning:
Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #1360 +/- ## ============================================ + Coverage 72.04% 72.19% +0.15% - Complexity 791 793 +2 ============================================ Files 422 425 +3 Lines 17814 18052 +238 Branches 2768 2801 +33 ============================================ + Hits 12834 13033 +199 - Misses 3565 3590 +25 - Partials 1415 1429 +14 ```

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

nobodyiam commented 10 months ago

This is interesting 😀

OrezzerO commented 7 months ago

更好的方案是使用自定义的 StreamObserver(此处先叫做StreamHandler),将其和gRPC解耦,在 Triple 传输层通过适配器转换为gRPC StreamObserver。此处使用自定义的StreamHandler。

确实是这样的, 因此方案中还需要考虑如何让 bolt 协议来支持流式处理. 我能想到两种方案:

  1. 在 bolt 协议中支持流式处理. 目前 bolt 协议用的是ping-pong 的方式只支持一来一回的应答模式,需要改动底层协议才能支持流式.
  2. 通讯层(grpc/bolt)都采用 ping-pong 的模式, 在 RPC 之中模拟流式处理. 这种方案的好处是可以兼容各种通讯层协议; 缺点是性能不是最优的, 有比较大的浪费.

我个人比较希望采用第一种方案,让bolt 原生支持 stream ,这能让使用bolt 的其他框架,也能享受到 stream 的优势.

回到这个 PR 上, 我建议统筹考虑 bolt/sofarpc 应该如何支持 stream ,再来合并这个PR.

coderabbitai[bot] commented 3 months ago

Walkthrough

The updates involve enriching RPC functionalities with added support for various streaming types (unary, client, server, and bidirectional). Changes include adjusting method signatures to accommodate streaming, introducing new utilities for handling streaming and exceptions, and expanding test cases to ensure robustness. This overhaul not only streamlines method invocations but also extends the RPC framework's capabilities to effectively handle complex streaming scenarios.

Changes

Files Change Summary
RpcConstants.java, AbstractInterfaceConfig.java, ConsumerConfig.java, ProviderConfig.java Added constants for new streaming types, caching method call types, and updated method configurations.
MethodConfig.java, AbstractCluster.java, TripleClientInvoker.java Enhanced handling of different streaming scenarios, updated timeout behaviors, and improved error handling.
MessageBuilder.java, StreamHandler.java, SofaStreamObserver.java Introduced new methods for message handling and stream observation.
ConsumerGenericFilter.java, UniqueIdInvoker.java, TripleServer.java Updated method signatures to accept SofaRequest, improved method identification, and enhanced stream handling.
GenericServiceImpl.java, SofaProtoUtils.java Expanded service functionalities, added utility methods for streaming, and updated method mapping utilities.
transformer.proto, helloworld.proto Added new RPC methods supporting various streaming types.
HelloService.java, HelloServiceImpl.java, ServerResponse.java, ClientRequest.java, ExtendClientRequest.java Defined new interfaces and classes for handling streaming in test scenarios.
TripleGenericStreamTest.java, TripleStubStreamTest.java Introduced comprehensive test cases for bi-directional and server streaming.

🐇✨📜 O hark! New streams do flow through code, With swift, smooth currents, they now bode. From unary to bidirectional streams, Through SOFA's veins, the data gleams. A rabbit’s cheer for the tech brigade, For each clever tweak and upgrade made! 🌟🐾


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share - [X](https://twitter.com/intent/tweet?text=I%20just%20used%20%40coderabbitai%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20the%20proprietary%20code.%20Check%20it%20out%3A&url=https%3A//coderabbit.ai) - [Mastodon](https://mastodon.social/share?text=I%20just%20used%20%40coderabbitai%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20the%20proprietary%20code.%20Check%20it%20out%3A%20https%3A%2F%2Fcoderabbit.ai) - [Reddit](https://www.reddit.com/submit?title=Great%20tool%20for%20code%20review%20-%20CodeRabbit&text=I%20just%20used%20CodeRabbit%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20proprietary%20code.%20Check%20it%20out%3A%20https%3A//coderabbit.ai) - [LinkedIn](https://www.linkedin.com/sharing/share-offsite/?url=https%3A%2F%2Fcoderabbit.ai&mini=true&title=Great%20tool%20for%20code%20review%20-%20CodeRabbit&summary=I%20just%20used%20CodeRabbit%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20proprietary%20code)
Tips ### Chat There are 3 ways to chat with [CodeRabbit](https://coderabbit.ai): - Review comments: Directly reply to a review comment made by CodeRabbit. Example: - `I pushed a fix in commit .` - `Generate unit testing code for this file.` - `Open a follow-up GitHub issue for this discussion.` - Files and specific lines of code (under the "Files changed" tab): Tag `@coderabbitai` in a new review comment at the desired location with your query. Examples: - `@coderabbitai generate unit testing code for this file.` - `@coderabbitai modularize this function.` - PR comments: Tag `@coderabbitai` in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples: - `@coderabbitai generate interesting stats about this repository and render them as a table.` - `@coderabbitai show all the console.log statements in this repository.` - `@coderabbitai read src/utils.ts and generate unit testing code.` - `@coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.` Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. ### CodeRabbit Commands (invoked as PR comments) - `@coderabbitai pause` to pause the reviews on a PR. - `@coderabbitai resume` to resume the paused reviews. - `@coderabbitai review` to trigger a review. This is useful when automatic reviews are disabled for the repository. - `@coderabbitai resolve` resolve all the CodeRabbit review comments. - `@coderabbitai help` to get help. Additionally, you can add `@coderabbitai ignore` anywhere in the PR description to prevent this PR from being reviewed. ### CodeRabbit Configration File (`.coderabbit.yaml`) - You can programmatically configure CodeRabbit by adding a `.coderabbit.yaml` file to the root of your repository. - Please see the [configuration documentation](https://docs.coderabbit.ai/guides/configure-coderabbit) for more information. - If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: `# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json` ### Documentation and Community - Visit our [Documentation](https://coderabbit.ai/docs) for detailed information on how to use CodeRabbit. - Join our [Discord Community](https://discord.com/invite/GsXnASn26c) to get help, request features, and share feedback. - Follow us on [X/Twitter](https://twitter.com/coderabbitai) for updates and announcements.