跳转至

Raft

2630 个字 48 行代码 预计阅读时间 9 分钟

Abstract

  • Raft 是一种复制状态机协议,一个复制服务通过多个副本 server 上存储其状态(数据)的完整副本来实现容错,复制使其即使部分 server 出现故障(崩溃或网络中断、不稳定,服务仍能继续运行,但问题在于,故障可能导致各个副本保存的数据不一致;
  • Raft client requests 组织成一个序列(日志,并确保所有副本 servers 都能看到相同的日志。每个副本按照日志的顺序执行 client requests,将其应用于服务状态的本地副本;
  • 由于所有正常运行的副本都能看到相同的日志内容,它们都会以相同顺序执行相同的 requests,从而保持一致性服务状态;
  • 如果某个 server 发生故障但之后恢复,Raft 会负责使其日志更新到最新状态;
  • 只要至少大多数 server 处于活跃状态且能够相互通信,Raft 就会继续运行;
  • Raft 只能满足 CAP 理论中的 CP
  • paper: 📖 In Search of an Understandable Consensus Algorithm
  • home: https://pdos.csail.mit.edu/6.824/labs/lab-raft1.html
  • Students' Guide to Raft
  • Visualization of Raft

Lab Materials

  • raft1/raft.go:Raft 的框架代码;
  • raft1/raft_test.go:Lab 3 的所有测试代码,必要时要自己构造测试,然后 debug 发现问题;

Part 3A: leader election

leader election

  • 实现 Raft leader 选举和 heartbeats 机制(不带日志条目的 AppendEntries RPC
  • 目标是选举出一个单一的 leader,在没有故障的情况下该 leader 保持其地位,并且当旧 leader 发生故障或旧 leader 之间的数据包丢失时,有新 leader 接管;
  • 选举定时器的重置时机;

Raft 节点

  1. 单个 raft 节点、RequestVote RPC、AppendEntries RPC 的各个变量需完全遵循 paper 里的 2
  2. 每个 raft 节点可以是 leader | candidate | follower,都可以发送和接收 RPC,维护唯一一个运行的 goroutine
  3. 每个 raft 节点都有一个选举定时器,每次收到 heartbeat log RPC 时,都会重置选举定时器,发送者是 leader,接收者是 follwer,一旦选举定时器超时,节点转化为 candidate 并进行投票选举;

投票

ticker():运行在每个节点上,判断是否需要投票。

发送方

  • 若当前节点不是 leader 并且上一次心跳间隔 > 选举定时器,就要触发 leader election
  • 变量的定义,state 变为 candidateTerm++、给自己投票、重置心跳时间戳等;
  • 依次请求其他 raft 节点进行投票 sendRequestVote(),注意 Term 的变更;
  • 如果获得超过一半的票数则当选 leader,并随后给其他 follower 发送心跳;

接收方

  • RequestVote() 接收来自 candidate 的投票请求;
  • 比较 Term、是否投过票、投的票的是不是 CandidateId 等返回VoteGranted ;

Heartbeat RPC

leader 在任期期间,会不间断地发送心跳给所有 follower,防止触发超时选举;

发送方

  • 采用轮询的方式,设置一个心跳间隔,依次发送心跳;
  • 与投票类似,区别在于当前角色是不是 leader,如果是则发送心跳给其他 follower,否则退出重新选举;

接收方

  • AppendEntries():接收来自 leader 的心跳或日志,当前 Part 3A 不涉及日志相关,所以只更新一些参数变更如 term、心跳时间戳等;

Tip

  1. 若投票超半数,则当选 leader 并广播给其他节点,否则重新一轮选举;
  2. 避免脑裂:每个 candidate 都给自己投票,如果只有两个 follower 并且转化成 candidate 发起选举,都给自己投一票,就会形成脑裂,解决办法就是随机设置选举定时器的超时时间,并且至少超过 leader 的心跳间隔;
  3. 心跳机制和发起投票是并行的,即使当前 candidate 正在选举,一旦选举计时器触发,应该开始另一次选举,避免 RPC 延迟或丢失;

Part 3B: log

log

  • 实现日志存储;
  • 心跳和日志的区别?
  • 日志的两阶段提交如何实现?日志复制的逻辑?
  • leader follower 接收到新的日志时,各个 index 如何处理?
  • 如果 follower 的日志不一致时如何恢复?
  • follower 的日志不一致时,有无优化机制用于减少日志同步的 RPC 次数,而非逐条日志回退?

心跳和日志

  • 日志是带有 logEntry 的心跳,心跳是长度为 0 的日志;
  • 每个节点都需要将 log 写入到磁盘中;
  • 心跳仅有 Term,而日志还带有 Command
  • leader 除了维护自己的 log,还要同步 follower log,必要时解决冲突;
  • 心跳和日志都使用同一个发射器,通过 nextIndexlen(log) 的大小来判断是心跳还是日志;
  • 为了简化操作,心跳和日志共同使用AppendEntries RPC 来处理;

Start()

  • 仅仅将 command 追加到 leader log 中,不需要保证 command 是否提交,交给心跳 / 日志协程处理;
  • 为了效率,当有命令到达时,需要立即触发一次心跳,但防止只包含一个日志的心跳过多,导致 RPC 调用频繁,所以可以等待一个比心跳间隔小的时间,收集这个时间段的日志后一起处理;

两阶段提交

日志复制

  1. 客户端请求leader 收到 client command 后,追加到自己的本地 log 中;
  2. 并发分发leader 通过AppendEntries RPC 将这条日志并行地分发到集群中的所有 follower
  3. 预持久化follower 收到日志后,会先处理冲突日志,保证不冲突会将其追加到自己的本地 log 中(此时日子处于未提交 / 不安全状态,并向 leader 返回 Sucess = true

日志提交

  1. 达成多数派:当 leader 收到超过半数的 true 时,认为该日志已经安全;
  2. 更新 commitIndexleader 将这条日志标记为 commited,并 applied 到自己的状态机中;
  3. 通知 followerleader 在之后的心跳中,会携带最新的 LeaderCommit 索引,follower 收到后也会将该日志 applied 到自己的状态机中。

应用状态机

  • 同样是一个轮询的协程,用条件变量来实现;
  • 不断检查 rf.commitIndex > rf.lastApplied,然后封装 ApplyMsg 并通过 applyCh 发送给应用层;

日志冲突

快速回退

当日志的长度小于 args.PrevLogIndex 或者该 index 下的 Term 不同时,触发快速回退;

RPC 次数

当日志冲突时,如果采用逐条日志回退,导致 AppendEntries RPC 次数增多,所以采用快速回退的优化机制,能使 leader 能够一次性地跳过整个冲突的 Term,减少 RPC 次数。

  • 日志冲突条件:len(rf.log) <= args.PrevLogIndex || rf.log[args.PrevLogIndex].Term != args.PrevLogTerm
  • 定义三个变量 XTerm、XLen、XIndex:
    • XTerm:冲突 log 对应的 Term,如果 PrevLogIndex 位置上没有 log 则为 -1
    • XIndex:follower 上对应 Term XTerm 的第一条 Log index
    • XLen:空白的 Log 槽位数(代码中写为 follower 日志的实际总长度
follower 发现日志冲突
// fast rollback
reply.XLen = len(rf.log)
if len(rf.log) <= args.PrevLogIndex {
    reply.XTerm = -1
    reply.XIndex = -1
} else if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
    reply.XTerm = rf.log[args.PrevLogIndex].Term
    reply.XIndex = args.PrevLogIndex
    for reply.XIndex > 0 && rf.log[reply.XIndex-1].Term == reply.XTerm {
        reply.XIndex -= 1
    }
}
leader 调整日志冲突
if reply.XTerm == -1 {
    rf.nextIndex[server] = reply.XLen
} else {
    if rf.log[reply.XIndex].Term != reply.XTerm {
        rf.nextIndex[server] = reply.XIndex
    } else {
        for j := reply.XIndex; j < len(rf.log); j++ {
            if rf.log[j].Term != reply.XTerm {
                rf.nextIndex[server] = j
                break
            }
        }
    }
}
follower 处理日志冲突
for i, entry := range args.Entries {
    index := args.PrevLogIndex + 1 + i
    if index >= len(rf.log) || rf.log[index].Term != entry.Term {
        rf.log = append(rf.log[:index], args.Entries[i:]...)
        break
    }
}

普通冲突

args.PrevLogIndex 下的 Term 相同时,不能保证后续 index 下的 Term 继续相同,需要截断处理,并且追加该 index 后的新 log

  • 假设 leader index term 分别为 [1, 1, 2, 3]args.PrevLogIndex 3
  1. case 1:follower [1, 1, 2, 2],发现最后一个 Term 不同,所以先截断 [2] 然后添加 [3] 处的日志;
  2. case 2:follower [1, 1, 2, 3, 3, 3],发现 follower leader 的日志保持一致并且多出 [3, 3],根据 paper 中提到 “If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it (§5.3)”,不需要截断超出的 log(可能是网络延迟或故障,leader 实际上已经有了 [3, 3],当然截断超出的 log 也是正确的;
for i, entry := range args.Entries {
    index := args.PrevLogIndex + 1 + i
    if index >= len(rf.log) || rf.log[index].Term != entry.Term {
        rf.log = append(rf.log[:index], args.Entries[i:]...)
        break
    }
}

Tip

paper 中图 2 说到at least as up-to-date as receiver’s log, grant vote”,因此判断的条件有两个(而我踩的坑是 "index" "term≥")

  • LastLogTerm > rf.log[len(rf.log)-1].Term
  • LastLogTerm == rf.log[len(rf.log)-1].Term && LastLogIndex ≥ len(rf.log)-1

Part 3C: persistence

persistence

  • 如果一个 server 挂掉后重启,需要利用持久化存储的数据恢复节点的状态;
  • 持久化的内容是什么?
  • 什么时候持久化?

持久化内容

  1. votedFor:当前 Term 内收到本节点选票的 CandidateId
    • 确保一个 Term 只能投一票;
    • 如果没有持久化,节点重启后不知道自己是否投过票,可能导致在同一 Term 投票给另一个 Candidate,随后出现两个 leader
  2. currentTerm:服务器已知最新的 Term
    • 防止一个节点在同一 Term 内投出两张票,或者旧 leader 重启时认为自己仍是当前 leader
  3. log:需要 log 来恢复状态机,并与其他节点进行日志匹配与同步;
  4. 其他数据不需要持久化,因为可以通过心跳的方式逐步推理获取;

持久化时机

  • 数据持久化到硬盘的开销远大于 RPC,所以在除开必要持久化外得越少越好;
  • 所有修改 votedForcurrentTermlog 操作后,必须要持久化;

Part 3D: log compaction

log compaction

  • Part 3C 中虽然实现了持久化,但是对于长期运行的 server 来说,持久化完整的 log 并不现实,因此日志压缩为 snapshot,对于 Raft 会丢弃快照之前的 log,可以大大减少持久化数据量并加快重启速度;
  • 某个 follower 落后太多,导致 leader 已经丢弃了用于追加的 log,所以 leader 需要发送快照以及从该快照开始的 log
  • 既然会丢弃部分 log,那么这部分 log 的缺失会出现什么问题?
  • 哪些节点需要压缩成 snapshot
  • 快照存储什么数据?快照是否需要持久化?
  • follower 发现自己的 log 已经落后于 leaderleader 如何通知 follower 使用快照进行替换?

日志截断

  • 根据论文中的描述,当日志压缩时,需要进行日志截断,通俗来说就是只保留最后一个日志作为开头,而其他日志全部删除,但这样如果要恢复快照,就需要三个参数进行恢复:
    • snapshot:快照数据
    • lastIncludedIndex: 日志中最高索引
    • lastIncludedTerm: 日志中最高 Term
  • 当存在日志截断后,Raft 节点中的各参数需要位置偏移,比如取 rf.log 中的数据时,需要 - lastIncludedIndex,而取 rf.log 的长度时,需要加 + lastIncludedIndex,既然如此,声明两个函数来操作:
// 使用 log 切片时使用的索引
func (rf *Raft) RealLogIndex(virtualLogIndex int) int {
    return virtualLogIndex - rf.lastIncludedIndex
}
// 全局真实递增索引
func (rf *Raft) VirtualLogIndex(realLogIndex int) int {
    return realLogIndex + rf.lastIncludedIndex
}

Snapshot()

接收应用层的快照请求,并截断 log 数组;

  1. 判断是否接受 snapshot
  2. 保存 snapshot
  3. 更新 lastIncludedIndexlastIncludedTerm ;

持久化

  • 新增三个持久化内容,lastIncludedIndexlastIncludedTermsnapshot ;

InstallSnapshot RPC

当某个某个节点落后了 nextIndex - 1 < lastIncludedIndex,需要发送快照;

发送方

  • leader 发现 follower 要求回退的日志已经被截断时,触发 InstallSnapshot
  • InstallSnapshot 触发时机:
    • leader 发送心跳之前判断 follower 是否落后于快照;
    • leader 发送心跳后,如果发现日志冲突了,更改 rf.nextIndex 时再判断 follower 是否落后于快照;
    • 只要 nextIndex < lastIncludedIndex,就要触发;
  • 触发成功后,要重新设置 nextIndexmatchIndex
  • 为了防止频繁触发 InstallSnapshot,可以先设置 rf.nextIndex,而不立刻 go InstallSnapshot,交给下次心跳来触发;

接收方

  • 如果 args.LastIncludedIndex 处存在日志,那么需要日志截断并创建快照;
  • 如果 args.LastIncludedIndex 处不存在日志,那么需要清空日志;
  • 更新 lastAppliedcommitIndex ;
  • 完成操作后,需要封装 ApplyMsg 并通过 applyCh 发送给应用层;

数组越界

  • 由于加入了日志截断,所以有 RealLogIndexVirtualLogIndex,还有 lastIncludedIndex 0 的区别,很可能导致书逐月节;
  • 例如:
    • leader 没有进行 nextIndexmatchIndex 初始化
    • 快速回退中 XIndex > lastIncludedIndex 而不是 0

Reference