sofastack / sofa-jraft

A production-grade java implementation of RAFT consensus algorithm.
https://www.sofastack.tech/projects/sofa-jraft/
Apache License 2.0
3.52k stars 1.12k forks source link

AssertionError in AppendEntriesRequestProcessor #1091

Open Cyrill opened 3 months ago

Cyrill commented 3 months ago

I managed to hit an AssertionError in AppendEntriesRequestProcessor. Apparently, there is a race. The crash was observed on a custom branch, though the code in master is the same.

First, the code:

AppendEntriesRequestProcessor.PeerExecutorSelector has the following code (Intentionally removed unrelated lines):

public Executor select(final String reqClass, final Object reqHeader) {
            // ...

            final Node node = NodeManager.getInstance().get(groupId, peer);

            if (node == null || !node.getRaftOptions().isReplicatorPipeline()) {
                return executor();
            }

            // The node enable pipeline, we should ensure bolt support it.
            RpcFactoryHelper.rpcFactory().ensurePipeline();

            final PeerRequestContext ctx = getOrCreatePeerRequestContext(groupId, pairOf(peerId, serverId), null);

            return ctx.executor;
        }

getOrCreatePeerRequestContext looks as follows:

PeerRequestContext getOrCreatePeerRequestContext(final String groupId, final PeerPair pair, final Connection conn) {
        ConcurrentMap<PeerPair, PeerRequestContext> groupContexts = this.peerRequestContexts.get(groupId);
        // ....

        PeerRequestContext peerCtx = groupContexts.get(pair);
        if (peerCtx == null) {
            synchronized (Utils.withLockObject(groupContexts)) {
                peerCtx = groupContexts.get(pair);
                // double check in lock
                if (peerCtx == null) {
                    // only one thread to process append entries for every jraft node
                    final PeerId peer = new PeerId();
                    final boolean parsed = peer.parse(pair.local);
                    assert (parsed);
                    final Node node = NodeManager.getInstance().get(groupId, peer);
                    assert (node != null); // <<<<<<<<<<<<<<AssertionError here!
                    peerCtx = new PeerRequestContext(groupId, pair, node.getRaftOptions()
                        .getMaxReplicatorInflightMsgs());
                    groupContexts.put(pair, peerCtx);
                }
            }
        }
        // ...

        return peerCtx;
    }

Execution flow

I don't have a specific code to reproduce this issue, but the flow is simple. I observed a slight delay in messaging/threads which ended up with an error.

My assumptions regarding the execution flow are:

fengjiachun commented 3 months ago

I agree with you; that situation could definitely happen. I suspect the problem arises from sharing the RpcServer. The RaftGroupService closes before the RpcServer, causing a delay in processing new requests because the raft service is already closed by then.