ninehills / blog

https://ninehills.tech
758 stars 68 forks source link

[MIT 6.824 分布式系统课程] Lab2 Raft 心得 #62

Open ninehills opened 6 years ago

ninehills commented 6 years ago

Raft struct 的成员

1 Raft 节点的角色,可以使用Go常量

const (
    Follower = iota         // 0
    Candidate       // 1
    Leader          // 2
)

2 使用Buffered Channel进行异步通信,比如等待心跳包结果等情况,我定义了如下Channel

    chanHeartbeat chan bool // 收到心跳
    chanWinVote chan bool   // 赢得选举
    chanGrantVote chan bool // 获得选举票

        chanApply chan ApplyMsg // 用来commit的channel

Make()中需要初始化Channel为Buffered Channel

    rf.chanWinVote = make(chan bool, 10)
    rf.chanGrantVote = make(chan bool, 10)
    rf.chanHeartbeat = make(chan bool, 10)

3 Raft struct大部分成员都是论文的Figure 2的内容

4 发送broadcastRequestVote后,需要进行voteCount 计数,以确定是否赢得Vote,所以 Raft struct需要增加voteCount成员

协程冲突问题

Raft有如下成员来进行加锁

mu        sync.Mutex          // Lock to protect shared access to this peer's state

使用方法一般有两种,一种是在明确的边界区进行

rf.mu.Lock()
......
rf.mu.Unlock()

还有一种是在函数头

rf.mu.Lock()
defer rf.mu.Unlock()
........

加锁时要小心加锁,以确保不会发生死锁,能不加锁就不加锁。此外测试时使用go test -race来判断是否发生冲突

善用 go 协程机制

一般用在异步触发,比如广播requestVote、AppendEntries或者异步Commit上

完整实现论文中的协议,而不要靠猜测

论文中所有的协议都需要准确实现,否则很容易出错(在互联网上找到的实现中,大部分都通不过测试),几个容易出错的地方如下,都debug了很久

// RequestVote中判断日志是否up-to-date,应该如下实现
if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) &&
            (args.LastLogTerm > rf.logs[rf.getLastIndex()].Term ||
                (args.LastLogTerm == rf.logs[rf.getLastIndex()].Term && args.LastLogIndex >= rf.getLastIndex())) {
        // Raft determines which of two logs is more up-to-date by comparing the
        // index and term of the last entries in the logs. If the logs have last
        // entries with different terms, then the log with the later term is
        // more up-to-date. If the logs end with the same term, then whichever
        // log is longer is more up-to-date.
        reply.VoteGranted = true
        rf.votedFor = args.CandidateId
    } else {
        reply.VoteGranted = false
    }
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    /*  Receiver implementation:
    1. Reply false if term < currentTerm (§5.1)
    2. Reply false if log doesn’t contain an entry at prevLogIndex
    whose term matches prevLogTerm (§5.3)
    3. If an existing entry conflicts with a new one (same index
    but different terms), delete the existing entry and all that
    follow it (§5.3)
    4. Append any new entries not already in the log
    5. If leaderCommit > commitIndex, set commitIndex =
        min(leaderCommit, index of last new entry)  */
    rf.mu.Lock()
    defer rf.mu.Unlock()
    rf.chanHeartbeat <- true

    if args.Term > rf.currentTerm {
        rf.log("AppendEntries Term Bigger, convert to follower")
        rf.role = Follower
        rf.currentTerm = args.Term
        rf.votedFor = -1
    }
    DPrintf("1 commitIndex %d lastApplied %d LastIndex %d", rf.commitIndex, rf.lastApplied, rf.getLastIndex())
    reply.Term = rf.currentTerm
    reply.NextTryIndex = -1 //默认为-1,代表index--,此处为简化逻辑,除日志不匹配的其他情况都统一--
    if args.Term < rf.currentTerm {
        reply.Success = false
    } else if len(rf.logs) <= args.PrevLogIndex || rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
        // Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
        if len(rf.logs) <= args.PrevLogIndex{
            DPrintf("logsLen(%d) prevLogIndex(%d)",
                len(rf.logs), args.PrevLogIndex)
        } else {
            DPrintf("logsLen(%d) prevLogIndex(%d) logsTerm(%d) prevLogTerm(%d)",
                len(rf.logs), args.PrevLogIndex, rf.logs[args.PrevLogIndex].Term, args.PrevLogTerm)
        }
        if len(rf.logs) > args.PrevLogIndex {
            // 如果日志内容不匹配,找到同Term中最早的Index,直接回退到那个Index
            term := rf.logs[args.PrevLogIndex].Term
            for reply.NextTryIndex = args.PrevLogIndex - 1;
                reply.NextTryIndex > 0 && rf.logs[reply.NextTryIndex].Term == term; reply.NextTryIndex-- {}
            reply.NextTryIndex++
        } else {
            // 如果日志长度不匹配,则以当前的日志长度为准
            reply.NextTryIndex = rf.getLastIndex() + 1
        }
        reply.Success = false
    } else {
        // If an existing entry conflicts with a new one (same index but different terms),
        // delete the existing entry and all that follow it.
        // ------------------------
        // The if here is crucial. If the follower has all the entries the leader sent,
        // the follower MUST NOT truncate its log. Any elements following the entries
        // sent by the leader MUST be kept. This is because we could be receiving an
        // outdated AppendEntries RPC from the leader, and truncating the log would
        // mean “taking back” entries that we may have already told the leader that
        // we have in our log.
        // ------- wrong code -------
        // preLogs := rf.logs[:args.PrevLogIndex + 1]
        // preLogs = append(preLogs, args.Entries...)
        // rf.log(fmt.Sprintf("%d", preLogs))
        // rf.logs = preLogs
        // --------------------------
        rf.log(fmt.Sprintf("Log append: rfLogs(%d) entries(%d) preLogIndex(%d)",
            rf.logs, args.Entries, args.PrevLogIndex))
        for i := 0; i < len(args.Entries); i++ {
            if i + args.PrevLogIndex + 2 > len(rf.logs) {
                // 如果Entries 长过了 rf.logs,那么直接append
                rf.logs = append(rf.logs, args.Entries[i])
            } else if rf.logs[i + args.PrevLogIndex + 1].Term != args.Entries[i].Term {
                rf.logs = rf.logs[:i + args.PrevLogIndex + 1]
                rf.logs = append(rf.logs, args.Entries[i])
            }
        }
        rf.log(fmt.Sprintf("Log append result: rfLogs(%d)", rf.logs))
        if args.LeaderCommit > rf.commitIndex {
            if args.LeaderCommit > rf.getLastIndex() {
                rf.commitIndex = rf.getLastIndex()
            } else {
                rf.commitIndex = args.LeaderCommit
            }
            if rf.commitIndex > rf.lastApplied {
                go rf.commit()
            }
        }
        reply.Success = true
    }
    DPrintf("2 commitIndex %d lastApplied %d LastIndex %d", rf.commitIndex, rf.lastApplied, rf.getLastIndex())
    rf.log(fmt.Sprintf("reply AppendEntries from %d: %t", args.LeaderId, reply.Success))

    rf.persist()
}
// 还有  sendAppendEntries 返回成功后的处理
    if reply.Success {
        // If successful: update nextIndex and matchIndex for follower (§5.3)
        // 此处注意不能简单更新为 rf.getLastIndex(),因为sendAppendEntries是异步的,send的时候最新的index有可能已经变了
        rf.matchIndex[server] = args.PrevLogIndex + len(args.Entries)
        rf.nextIndex[server] = rf.matchIndex[server] + 1

Debug的方法

可以给Raft struce定义一个log方法,打印一些基本信息,如Term等,这样就不用每次都拼接。例如

func (rf *Raft) log(message string) {
    // 封装日志方法
    if Debug > 0 {
        log.Printf("node(%d)role(%d)term(%d)commitIndex(%d)logs(%d): %s",
            rf.me, rf.role, rf.currentTerm, rf.commitIndex, rf.logs, message)
    }
}

当然这里图省事用的string,可以换成format,参见utils.go下的DPrintf的实现

测试

测试随机性比较大,一次测试通过不能代表没问题,建议跑多轮测试都没有问题才算通过

选举超时和心跳间隔的设定

    // 选举超时,Paper中写 150–300ms 随机,测试环境是1000ms
    rand.Seed(time.Now().UnixNano())
    electionTimeout := func() time.Duration {
        return time.Duration(500 + rand.Intn(150)) * time.Millisecond
    }
    // broadcastTime << electionTimeout << MTBF(平均无故障工作时间)
    // << 代表数量级的差别
    heartBeatInterval := time.Duration(50) * time.Millisecond

选举状态机的实现

for {
        rf.log("loop start")
        rf.mu.Lock()
        role := rf.role
        rf.mu.Unlock()
        switch role {
        case Follower:
            // Respond to RPCs from candidates and leaders
            // If election timeout elapses without receiving AppendEntries RPC from current leader
            // or granting vote to candidate: convert to candidate
            select {
                case <-rf.chanHeartbeat:
                case <-rf.chanGrantVote:
                case <-time.After(electionTimeout()):
                    rf.mu.Lock()
                    rf.log("follower no heartbeat/vote and timeout, convert to candidate")
                    rf.role = Candidate
                    rf.mu.Unlock()
            }
        case Candidate:
            // On conversion to candidate, start election:
            // - Increment currentTerm
            // - Vote for self
            // - Reset election timer
            // - Send RequestVote RPCs to all other servers
            // If votes received from majority of servers: become leader
            // If AppendEntries RPC received from new leader: convert to follower
            // If election timeout elapses: start new election
            rf.mu.Lock()
            rf.currentTerm++
            rf.votedFor = rf.me
            rf.voteCount = 1    // 自己算一票
            rf.persist()
            rf.mu.Unlock()
            go rf.broadcastRequestVote()
            select {
            case <-rf.chanWinVote:
                rf.mu.Lock()
                if len(rf.chanHeartbeat) > 0 {
                    <-rf.chanHeartbeat
                }
                rf.log("win vote, convert to leader")
                if rf.role != Leader {
                    // 有一种情况是 chanWinVote 和 chanHeartbeat同时收到消息,这时候Go会随机选择一个case
                    // 但是 chanWinVote中的数据还在,会干扰到下次选举
                    // 临时的解决办法是在配置chanWinVote的时候同时设定role,此处二次验证
                    rf.log("chanWinVote has outdated message, ignore it.")
                    continue
                }
                // 初始化leader的变量
                rf.nextIndex = make([]int, len(rf.peers))
                rf.matchIndex = make([]int, len(rf.peers))
                for i := range rf.peers {
                    rf.nextIndex[i] = rf.getLastIndex() + 1
                    rf.matchIndex[i] = 0
                }
                rf.mu.Unlock()
            case <-rf.chanHeartbeat:
                rf.mu.Lock()
                rf.log("received heartbeat, convert to follower")
                rf.role = Follower
                rf.mu.Unlock()
            case <-time.After(electionTimeout()):
                rf.log("election timeout, skip")
            }
        case Leader:
            // Upon election: send initial empty AppendEntries RPCs (heartbeat) to each server;
            //      repeat during idle periods to prevent election timeouts (§5.2)
            // If command received from client: append entry to local log,
            //      respond after entry applied to state machine (§5.3)
            // If last log index ≥ nextIndex for a follower:
            //      send AppendEntries RPC with log entries starting at nextIndex
            // - If successful: update nextIndex and matchIndex for follower (§5.3)
            // - If AppendEntries fails because of log inconsistency:
            //      decrement nextIndex and retry (§5.3)
            // If there exists an N such that N > commitIndex, a majority
            //      of matchIndex[i] ≥ N, and log[N].term == currentTerm:
            //      set commitIndex = N (§5.3, §5.4).
            rf.broadcastAppendEntries()
            time.Sleep(heartBeatInterval)
        }
    }

追求代码的美

在完成的过程中以及完成后,因为没有人帮忙判卷,看了一些现在互联网已经有的实现作为参考,总的来说,都只是为了完成而完成,注释也好、代码组织形式也好都不是很重视。可能过上几个月就忘记了以前为什么这么写的原因,这点对于Raft协议这种实现的稍微有点差池就出大问题的,非常重要。

ninehills commented 6 years ago

因为这个课程不希望大家Public代码,所以没有公开代码,有需要的可以单独发邮件索取。

hd5970 commented 6 years ago

选举状态机的实现里, 在sleep心跳时长的时候, 没有办法接受处理AppendEntriesRPC

ninehills commented 5 years ago

选举状态机的实现里, 在sleep心跳时长的时候, 没有办法接受处理AppendEntriesRPC

broadcastAppendEntries()里会用协程的方式启动多个 sendAppendEntries,Reply由此处理。