kardiachain / go-kardia

Golang implementation of KardiaChain Decentralize Network
https://kardiachain.io
GNU Lesser General Public License v3.0
33 stars 18 forks source link

[Feature Request] Blockchain reactor for fast sync purpose #131

Closed trinhdn97 closed 3 years ago

trinhdn97 commented 3 years ago

Blockchain reactor implementation details

The reactor will include a demultiplexing routine which will send each message to each sub routine for independent processing. The fast sync protocol logic is decoupled from IO by using three concurrent threads of execution: a scheduler, a processor, and a demuxer. The demuxRoutine acts as "pacemaker" setting the time in which events are expected to be handled. It is responsible for translating between internal events and network IO messages, and for routing events between components. Both the scheduler and processor are structured as finite state machines with input and output events. Input events are received on an unbounded priority queue, with higher priority for error events. Output events are emitted on a blocking, bounded channel. Network IO is handled by the KardiaChain p2p subsystem, where messages are sent in a non-blocking manner.

// Takes the channel as a parameter to avoid race conditions on r.events.
func (r *BlockchainReactor) demux(events <-chan Event) {
    var (
        scheduleFreq = 20 * time.Millisecond
        doScheduleCh = make(chan struct{}, 1)
        doScheduleTk = time.NewTicker(scheduleFreq)
    )
    defer doScheduleTk.Stop()
        ...
    for {
        select {
            case <-doScheduleTk.C:
                    select {
                    case doScheduleCh <- struct{}{}:
                    default:
                    }
                        case <-doScheduleCh:
                    r.scheduler.send(rTrySchedule{time: time.Now()})

                        // Events from peers. Closing the channel signals event loop termination.
                case event, ok := <-events:
                        ...

            // Incremental events from scheduler
            case event := <-r.scheduler.next():
            ...

            // Incremental events from processor
            case event := <-r.processor.next():
            ...

            // Terminal event from processor
            case err := <-r.processor.final():
            ...
        }
    }
}

The IO component is responsible for exchanging (sending and receiving) fast sync protocol messages with peers. There is one send and one receive routine per peer.

Life cycle management

A set of routines for individual processes allow processes to run in parallel with clear life cycle management. Start, Stop, and AddPeer hooks currently present in the reactor will delegate to the sub-routines allowing them to manage internal state independent without further coupling to the reactor.

func (r *BlockChainReactor) Start() {
    r.events = make(chan Event, chBufferSize)
    go r.scheduler.start()
    go r.processor.start()
        go r.demux(r.events)
    ...
}

func (bcR *BlockchainReactor) Receive(...) {
    ...
    r.msgs <- msg
    ...
}

func (r *BlockchainReactor) Stop() {
    ...
    r.msgs <- stop
    ...
}

...

func (r *BlockchainReactor) AddPeer(peer p2p.Peer) {
    ...
    r.msgs <- bcAddPeerEv{peer.ID}
    ...
}

IO handling

An IO handling routine within the reactor will isolate peer communication. Message going through the ioRoutine will usually be one way, using p2p APIs. In the case in which the p2p API such as trySend return errors, the ioRoutine can funnel those message back to the demuxRoutine for distribution to the other routines. For instance errors from the ioRoutine can be consumed by the scheduler to inform better peer selection implementations.

func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height uint64) error {
    ...
    msgBytes, err := EncodeMsg(&bcproto.BlockRequest{Height: height})
    if err != nil {
        return err
    }
    queued := peer.TrySend(BlockchainChannel, msgBytes)
    if !queued {
        return fmt.Errorf("send queue full")
    }
    return nil
}

func (sio *switchIO) sendStatusResponse(base uint64, height uint64, peerID p2p.ID) error {
        ...
    msgBytes, err := EncodeMsg(&bcproto.StatusResponse{Height: height, Base: base})
    if err != nil {
        return err
    }
    if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
        return fmt.Errorf("peer queue full")
    }
    return nil
}

func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error {
    ...
    msgBytes, err := EncodeMsg(&bcproto.BlockResponse{Block: bpb})
    if err != nil {
        return err
    }
    if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
        return fmt.Errorf("peer queue full")
    }
    return nil
}

func (sio *switchIO) sendBlockNotFound(height uint64, peerID p2p.ID) error {
    peer := sio.sw.Peers().Get(peerID)
    if peer == nil {
        return fmt.Errorf("peer not found")
    }
    msgBytes, err := EncodeMsg(&bcproto.NoBlockResponse{Height: height})
    if err != nil {
        return err
    }

    if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
        return fmt.Errorf("peer queue full")
    }

    return nil
}

func (sio *switchIO) trySwitchToConsensus(state cstate.LastestBlockState, skipWAL bool) bool {
    conR, ok := sio.sw.Reactor("CONSENSUS").(consensusReactor)
    if ok {
        conR.SwitchToConsensus(state, skipWAL)
    }
    return ok
}

func (sio *switchIO) broadcastStatusRequest() error {
    ...
    sio.sw.Broadcast(BlockchainChannel, msgBytes)
    return nil
}

Processor

The processor is responsible for ordering, verifying and executing blocks. The Processor will maintain an internal map queue referring to the blocks waiting to be processed. As a set of blocks arrive unordered, the Processor will check if it has height+1 necessary to process the next block. The processor also maintains the interface processorContext in order to verify, apply and save new blocks.

type pcState struct {
    // blocks waiting to be processed
    queue blockQueue

    // draining indicates that the next rProcessBlock event with a queue miss constitutes completion
    draining bool

    // the number of blocks successfully synced by the processor
    blocksSynced int

    // the processorContext which contains the processor dependencies
    context processorContext
}

type processorContext interface {
    applyBlock(blockID types.BlockID, block *types.Block) error
    verifyCommit(chainID string, blockID types.BlockID, height uint64, commit *types.Commit) error
    saveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit)
    kaiState() cstate.LastestBlockState
    setState(cstate.LastestBlockState)
}

In Kardia blockchain, the commit for block (signed votes messages) h is contained in block h+1, and thus a node performing fast sync must receive two sequential blocks before it can verify fully the first one. If verification succeeds, the first block is accepted; if it fails, both blocks are rejected, since it is not known which block was faulty. When the node rejects a block, it suspects the sending peer of being faulty and evicts this peer from the set of peers. The same happens when a peer does not reply within a predefined time interval.

// nextTwo returns the next two unverified blocks
func (state *pcState) nextTwo() (queueItem, queueItem, error) {
    if first, ok := state.queue[state.height()+1]; ok {
        if second, ok := state.queue[state.height()+2]; ok {
            return first, second, nil
        }
    }
    return queueItem{}, queueItem{}, fmt.Errorf("not found")
}

func (state *pcState) handle(event Event) (Event, error) {
        switch event := event.(type) {
        case rProcessBlock:
        // verify if +second+ last commit "confirms" +first+ block
        err = state.context.verifyCommit(kaiState.ChainID, firstID, first.Height(), second.LastCommit())
        if err != nil {
            state.purgePeer(firstItem.peerID)
            if firstItem.peerID != secondItem.peerID {
                state.purgePeer(secondItem.peerID)
            }
            return pcBlockVerificationFailure{
                    height: first.Height(), firstPeerID: firstItem.peerID, secondPeerID: secondItem.peerID},
                nil
        }
...

Furthermore, it informs the scheduler whether a block processing was successful (pcBlockProcessed) or it has led to an error (pcBlockVerificationFailure).

Scheduler

The scheduler contains the business logic for tracking peers and determining which block to request from whom. The scheduler needs to maintain state on:

type blockState int

const (
    blockStateUnknown   blockState = iota + 1 // no known peer has this block
    blockStateNew                             // indicates that a peer has reported having this block
    blockStatePending                         // indicates that this block has been requested from a peer
    blockStateReceived                        // indicates that this block has been received by a peer
    blockStateProcessed                       // indicates that this block has been applied
)

type scheduler struct {
    initHeight uint64

    // next block that needs to be processed. All blocks with smaller height are
    // in Processed state.
    height uint64

    // lastAdvance tracks the last time a block execution happened.
    // syncTimeout is the maximum time the scheduler waits to advance in the fast sync process before finishing.
    // This covers the cases where there are no peers or all peers have a lower height.
    lastAdvance time.Time
    syncTimeout time.Duration

    // a map of peerID to scheduler specific peer struct `scPeer` used to keep
    // track of peer specific state
    peers       map[p2p.ID]*scPeer
    peerTimeout time.Duration   // maximum response time from a peer otherwise prune
    minRecvRate int64             // minimum receive rate from peer otherwise prune

    // the maximum number of blocks that should be New, Received or Pending at any point
    // in time. This is used to enforce a limit on the blockStates map.
    targetPending int
    // a list of blocks to be scheduled (New), Pending or Received. Its length should be
    // smaller than targetPending.
    blockStates map[uint64]blockState

    // a map of heights to the peer we are waiting a response from
    pendingBlocks map[uint64]p2p.ID

    // the time at which a block was put in blockStatePending
    pendingTime map[uint64]time.Time

    // a map of heights to the peers that put the block in blockStateReceived
    receivedBlocks map[uint64]p2p.ID
}

The scheduler receives relevant protocol messages from peers (for example bcBlockResponse and bcStatusResponse), but also internal events that are the result of the block processing in the processor (the events carry the information of whether a block was successfully processed or there was an error). The scheduler schedules block requests by emitting internal events (scBlockRequest) and also informs the processor about internal processing, for example, when block response is received (scBlockReceived) or if there is an error in peer behaviour (scPeerError). It is configured to maintain a target n of in flight messages and will use feedback from bcBlockResponse, bcStatusResponse and scPeerError to produce an optimal assignment of rTrySchedule at each doScheduleCh ticker.

func (sc *scheduler) handle(event Event) (Event, error) {
    switch event := event.(type) {
    case bcResetState:
        nextEvent, err := sc.handleResetState(event)
        return nextEvent, err
    case bcStatusResponse:
        nextEvent, err := sc.handleStatusResponse(event)
        return nextEvent, err
    case bcBlockResponse:
        nextEvent, err := sc.handleBlockResponse(event)
        return nextEvent, err
    case bcNoBlockResponse:
        nextEvent, err := sc.handleNoBlockResponse(event)
        return nextEvent, err
    case rTrySchedule:
        nextEvent, err := sc.handleTrySchedule(event)
        return nextEvent, err
    case bcAddNewPeer:
        nextEvent, err := sc.handleAddNewPeer(event)
        return nextEvent, err
    case bcRemovePeer:
        nextEvent, err := sc.handleRemovePeer(event)
        return nextEvent, err
    case rTryPrunePeer:
        nextEvent, err := sc.handleTryPrunePeer(event)
        return nextEvent, err
    case pcBlockProcessed:
        nextEvent, err := sc.handleBlockProcessed(event)
        return nextEvent, err
    case pcBlockVerificationFailure:
        nextEvent, err := sc.handleBlockProcessError(event)
        return nextEvent, err
    default:
        return scSchedulerFail{reason: fmt.Errorf("unknown event %v", event)}, nil
    }
}
...

// lowest block in sc.blockStates with state == blockStateNew or -1 if no new blocks
func (sc *scheduler) nextHeightToSchedule() uint64 {
    var min uint64 = math.MaxUint64
    for height, state := range sc.blockStates {
        if state == blockStateNew && height < min {
            min = height
        }
    }
    if min == math.MaxUint64 {
        min = 0
    }
    return min
}

func (sc *scheduler) handleTrySchedule(event rTrySchedule) (Event, error) {
    if time.Since(sc.lastAdvance) > sc.syncTimeout {
        return scFinishedEv{reason: "timeout, no advance"}, nil
    }

    nextHeight := sc.nextHeightToSchedule()
    if nextHeight == 0 {
        return noOp, nil
    }

    bestPeerID, err := sc.selectPeer(nextHeight)
    if err != nil {
        return scSchedulerFail{reason: err}, nil
    }
    if err := sc.markPending(bestPeerID, nextHeight, event.time); err != nil {
        return scSchedulerFail{reason: err}, nil // XXX: peerError might be more appropriate
    }
    return scBlockRequest{peerID: bestPeerID, height: nextHeight}, nil
}
...

type scPeer struct {
    peerID p2p.ID

    // initialized as New when peer is added, updated to Ready when statusUpdate is received,
    // updated to Removed when peer is removed
    state peerState

    base        uint64 // updated when statusResponse is received
    height      uint64 // updated when statusResponse is received
    lastTouched time.Time
    lastRate    int64 // last receive rate in bytes
}

Once the Fastsync protocol terminates, this is signaled to the KardiaChain consensus component (denoted ConsensusManager) with a trySwitchToConsensus event.

trinhdn97 commented 3 years ago

Blockchain reactor implemented at https://github.com/kardiachain/go-kardia/pull/129 in order to resolve this issue.