sofastack / sofa-jraft

A production-grade java implementation of RAFT consensus algorithm.
https://www.sofastack.tech/projects/sofa-jraft/
Apache License 2.0
3.53k stars 1.12k forks source link

leader怎么处理follower回复的ErrorResponse响应? #1074

Closed veryjing closed 4 months ago

veryjing commented 4 months ago

leader向follower传输日志时,如果follower发现日志有遗漏,回复了leader一个ErrorResponse响应,leader要怎么处理这个响应呢?

我在阅读源码的时候,发现follower在接收领导者传输过来的日志时,会调用checkAndResolveConflict方法解决日志冲突,在checkAndResolveConflict方法中会执行下面这样几行代码:

if (firstLogEntry.getId().getIndex() > this.lastLogIndex + 1) { ThreadPoolsFactory.runClosureInThread(this.groupId, done, new Status(RaftError.EINVAL, "There's gap between first_index=%d and last_log_index=%d", firstLogEntry.getId().getIndex(), this.lastLogIndex)); return false; }

上面这几行代码就是处理日志空隙的问题,follower向leader回复了一个ErrorResponse响应。这个响应在被leader接收到后,会在下面这个方法中被判断类型:

rc.invokeAsync(endpoint, request, ctx, new InvokeCallback() {

            @SuppressWarnings({ "unchecked", "ConstantConditions" })
            @Override
            public void complete(final Object result, final Throwable err) {
                if (future.isCancelled()) {
                    onCanceled(request, done);
                    return;
                }

                if (err == null) {
                    Status status = Status.OK();
                    Message msg;
                    //在这判断回复的响应是ErrorResponse
                    if (result instanceof ErrorResponse) {
                        //设置错误状态码
                        status = handleErrorResponse((ErrorResponse) result);
                        msg = (Message) result;
                    } else if (result instanceof Message) {
                        final Descriptors.FieldDescriptor fd = ((Message) result).getDescriptorForType() //
                            .findFieldByNumber(RpcResponseFactory.ERROR_RESPONSE_NUM);
                        if (fd != null && ((Message) result).hasField(fd)) {
                            final ErrorResponse eResp = (ErrorResponse) ((Message) result).getField(fd);
                            status = handleErrorResponse(eResp);
                            msg = eResp;
                        } else {
                            msg = (T) result;
                        }
                    } else {
                        msg = (T) result;
                    }
                    if (done != null) {
                        try {
                            if (status.isOk()) {
                                done.setResponse((T) msg);
                            }//接下来程序会执行这行代码,这个run方法种就会执行发送日志时创建的回调方法
                            //onRpcReturned
                            done.run(status);
                        } catch (final Throwable t) {
                            LOG.error("Fail to run RpcResponseClosure, the request is {}.", request, t);
                        }
                    }
                    if (!future.isDone()) {
                        future.setResult(msg);
                    }
                } else {
                    if (done != null) {
                        try {
                            done.run(new Status(err instanceof InvokeTimeoutException ? RaftError.ETIMEDOUT
                                : RaftError.EINTERNAL, "RPC exception:" + err.getMessage()));
                        } catch (final Throwable t) {
                            LOG.error("Fail to run RpcResponseClosure, the request is {}.", request, t);
                        }
                    }
                    if (!future.isDone()) {
                        future.failure(err);
                    }
                }
            }

上面代码中的注释展示了程序的执行过程,我的疑惑是,follower向leader回复ErrorResponse响应后,在leader的onRpcReturned 方法中,并不会得到ErrorResponse响应,因为程序并不会执行上面代码中的 done.setResponse((T) msg)这行代码。并且根据RpcResponseClosureAdapterui的泛型,onRpcReturned方法似乎只能处理AppendEntriesResponse响应,那ErrorResponse响应该怎么处理呢?leader怎么知道和follower的日志出现空隙了呢?而且我发现如果在程序中使用GrpcResponseFactory就不会出现这种问题了,因为GrpcResponseFactory会把ErrorResponse响应包装成AppendEntriesResponse响应。

veryjing commented 4 months ago

找到答案了,原来是看错代码了,在onRpcReturned方法中有一个对响应的类型转换,一直没注意到。。