Raft¶
约 2630 个字 48 行代码 预计阅读时间 9 分钟
Abstract
- Raft 是一种复制状态机协议,一个复制服务通过多个副本 server 上存储其状态(数据)的完整副本来实现容错,复制使其即使部分 server 出现故障(崩溃或网络中断、不稳定
) ,服务仍能继续运行,但问题在于,故障可能导致各个副本保存的数据不一致; - Raft 将 client requests 组织成一个序列(日志
) ,并确保所有副本 servers 都能看到相同的日志。每个副本按照日志的顺序执行 client requests,将其应用于服务状态的本地副本; - 由于所有正常运行的副本都能看到相同的日志内容,它们都会以相同顺序执行相同的 requests,从而保持一致性服务状态;
- 如果某个 server 发生故障但之后恢复,Raft 会负责使其日志更新到最新状态;
- 只要至少大多数 server 处于活跃状态且能够相互通信,Raft 就会继续运行;
- Raft 只能满足 CAP 理论中的 CP;
- paper:
In Search of an Understandable Consensus Algorithm
- home: https://pdos.csail.mit.edu/6.824/labs/lab-raft1.html
- Students' Guide to Raft
- Visualization of Raft
Lab Materials
- raft1/raft.go:Raft 的框架代码;
- raft1/raft_test.go:Lab 3 的所有测试代码,必要时要自己构造测试,然后 debug 发现问题;
Part 3A: leader election¶
leader election
- 实现 Raft 的 leader 选举和 heartbeats 机制(不带日志条目的 AppendEntries RPC
) ; - 目标是选举出一个单一的 leader,在没有故障的情况下该 leader 保持其地位,并且当旧 leader 发生故障或旧 leader 之间的数据包丢失时,有新 leader 接管;
- 选举定时器的重置时机;
Raft 节点 ¶
- 单个 raft 节点、RequestVote RPC、AppendEntries RPC 的各个变量需完全遵循 paper 里的图 2!
- 每个 raft 节点可以是 leader | candidate | follower,都可以发送和接收 RPC,维护唯一一个运行的 goroutine;
- 每个 raft 节点都有一个选举定时器,每次收到 heartbeat 或 log 的 RPC 时,都会重置选举定时器,发送者是 leader,接收者是 follwer,一旦选举定时器超时,节点转化为 candidate 并进行投票选举;
投票 ¶
ticker():运行在每个节点上,判断是否需要投票。
发送方 ¶
- 若当前节点不是 leader 并且上一次心跳间隔 > 选举定时器,就要触发 leader election;
- 变量的定义,state 变为 candidate、
Term++、给自己投票、重置心跳时间戳等; - 依次请求其他 raft 节点进行投票
sendRequestVote(),注意Term的变更; - 如果获得超过一半的票数则当选 leader,并随后给其他 follower 发送心跳;
接收方 ¶
RequestVote()接收来自 candidate 的投票请求;- 比较
Term、是否投过票、投的票的是不是 CandidateId 等返回VoteGranted;
Heartbeat RPC¶
leader 在任期期间,会不间断地发送心跳给所有 follower,防止触发超时选举;
发送方 ¶
- 采用轮询的方式,设置一个心跳间隔,依次发送心跳;
- 与投票类似,区别在于当前角色是不是 leader,如果是则发送心跳给其他 follower,否则退出重新选举;
接收方 ¶
AppendEntries():接收来自 leader 的心跳或日志,当前 Part 3A 不涉及日志相关,所以只更新一些参数变更如 term、心跳时间戳等;
Tip
- 若投票超半数,则当选 leader 并广播给其他节点,否则重新一轮选举;
- 避免脑裂:每个 candidate 都给自己投票,如果只有两个 follower 并且转化成 candidate 发起选举,都给自己投一票,就会形成脑裂,解决办法就是随机设置选举定时器的超时时间,并且至少超过 leader 的心跳间隔;
- 心跳机制和发起投票是并行的,即使当前 candidate 正在选举,一旦选举计时器触发,应该开始另一次选举,避免 RPC 延迟或丢失;
Part 3B: log¶
log
- 实现日志存储;
- 心跳和日志的区别?
- 日志的两阶段提交如何实现?日志复制的逻辑?
- leader 和 follower 接收到新的日志时,各个 index 如何处理?
- 如果 follower 的日志不一致时如何恢复?
- 但 follower 的日志不一致时,有无优化机制用于减少日志同步的 RPC 次数,而非逐条日志回退?
心跳和日志 ¶
- 日志是带有 logEntry 的心跳,心跳是长度为 0 的日志;
- 每个节点都需要将 log 写入到磁盘中;
- 心跳仅有 Term,而日志还带有 Command;
- leader 除了维护自己的 log,还要同步 follower 的 log,必要时解决冲突;
- 心跳和日志都使用同一个发射器,通过
nextIndex和len(log)的大小来判断是心跳还是日志; - 为了简化操作,心跳和日志共同使用
AppendEntriesRPC 来处理;
Start()¶
- 仅仅将 command 追加到 leader 的 log 中,不需要保证 command 是否提交,交给心跳 / 日志协程处理;
- 为了效率,当有命令到达时,需要立即触发一次心跳,但防止只包含一个日志的心跳过多,导致 RPC 调用频繁,所以可以等待一个比心跳间隔小的时间,收集这个时间段的日志后一起处理;
两阶段提交 ¶
日志复制 ¶
- 客户端请求:leader 收到 client 的 command 后,追加到自己的本地 log 中;
- 并发分发:leader 通过
AppendEntriesRPC 将这条日志并行地分发到集群中的所有 follower; - 预持久化:follower 收到日志后,会先处理冲突日志,保证不冲突会将其追加到自己的本地 log 中(此时日子处于未提交 / 不安全状态
) ,并向 leader 返回Sucess = true;
日志提交 ¶
- 达成多数派:当 leader 收到超过半数的 true 时,认为该日志已经安全;
- 更新 commitIndex:leader 将这条日志标记为
commited,并 applied 到自己的状态机中; - 通知 follower:leader 在之后的心跳中,会携带最新的
LeaderCommit索引,follower 收到后也会将该日志 applied 到自己的状态机中。
应用状态机 ¶
- 同样是一个轮询的协程,用条件变量来实现;
- 不断检查
rf.commitIndex > rf.lastApplied,然后封装 ApplyMsg 并通过 applyCh 发送给应用层;
日志冲突 ¶
快速回退 ¶
当日志的长度小于
args.PrevLogIndex或者该 index 下的Term不同时,触发快速回退;
RPC 次数
当日志冲突时,如果采用逐条日志回退,导致 AppendEntries 的 RPC 次数增多,所以采用快速回退的优化机制,能使 leader 能够一次性地跳过整个冲突的 Term,减少 RPC 次数。
- 日志冲突条件:
len(rf.log) <= args.PrevLogIndex || rf.log[args.PrevLogIndex].Term != args.PrevLogTerm; - 定义三个变量 XTerm、XLen、XIndex:
- XTerm:冲突 log 对应的 Term,如果 PrevLogIndex 位置上没有 log 则为 -1;
- XIndex:follower 上对应 Term 为 XTerm 的第一条 Log 的 index;
- XLen:空白的 Log 槽位数(代码中写为 follower 日志的实际总长度
) ;
follower 发现日志冲突 ¶
// fast rollback
reply.XLen = len(rf.log)
if len(rf.log) <= args.PrevLogIndex {
reply.XTerm = -1
reply.XIndex = -1
} else if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.XTerm = rf.log[args.PrevLogIndex].Term
reply.XIndex = args.PrevLogIndex
for reply.XIndex > 0 && rf.log[reply.XIndex-1].Term == reply.XTerm {
reply.XIndex -= 1
}
}
leader 调整日志冲突 ¶
if reply.XTerm == -1 {
rf.nextIndex[server] = reply.XLen
} else {
if rf.log[reply.XIndex].Term != reply.XTerm {
rf.nextIndex[server] = reply.XIndex
} else {
for j := reply.XIndex; j < len(rf.log); j++ {
if rf.log[j].Term != reply.XTerm {
rf.nextIndex[server] = j
break
}
}
}
}
follower 处理日志冲突 ¶
for i, entry := range args.Entries {
index := args.PrevLogIndex + 1 + i
if index >= len(rf.log) || rf.log[index].Term != entry.Term {
rf.log = append(rf.log[:index], args.Entries[i:]...)
break
}
}
普通冲突 ¶
当
args.PrevLogIndex下的Term相同时,不能保证后续index下的Term继续相同,需要截断处理,并且追加该index后的新 log;
- 假设 leader 的 index 和 term 分别为 [1, 1, 2, 3],
args.PrevLogIndex为 3;
- case 1:follower [1, 1, 2, 2],发现最后一个 Term 不同,所以先截断 [2] 然后添加 [3] 处的日志;
- case 2:follower [1, 1, 2, 3, 3, 3],发现 follower 和 leader 的日志保持一致并且多出 [3, 3],根据 paper 中提到 “If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it (§5.3)”,不需要截断超出的 log(可能是网络延迟或故障,leader 实际上已经有了 [3, 3]
) ,当然截断超出的 log 也是正确的;
for i, entry := range args.Entries {
index := args.PrevLogIndex + 1 + i
if index >= len(rf.log) || rf.log[index].Term != entry.Term {
rf.log = append(rf.log[:index], args.Entries[i:]...)
break
}
}
Tip
paper 中图 2 说到
LastLogTerm > rf.log[len(rf.log)-1].TermLastLogTerm == rf.log[len(rf.log)-1].Term && LastLogIndex ≥ len(rf.log)-1
Part 3C: persistence¶
persistence
- 如果一个 server 挂掉后重启,需要利用持久化存储的数据恢复节点的状态;
- 持久化的内容是什么?
- 什么时候持久化?
持久化内容 ¶
votedFor:当前 Term 内收到本节点选票的 CandidateId;- 确保一个 Term 只能投一票;
- 如果没有持久化,节点重启后不知道自己是否投过票,可能导致在同一 Term 投票给另一个 Candidate,随后出现两个 leader;
currentTerm:服务器已知最新的 Term;- 防止一个节点在同一 Term 内投出两张票,或者旧 leader 重启时认为自己仍是当前 leader;
log:需要 log 来恢复状态机,并与其他节点进行日志匹配与同步;- 其他数据不需要持久化,因为可以通过心跳的方式逐步推理获取;
持久化时机 ¶
- 数据持久化到硬盘的开销远大于 RPC,所以在除开必要持久化外得越少越好;
- 所有修改
votedFor、currentTerm和log操作后,必须要持久化;
Part 3D: log compaction¶
log compaction
- Part 3C 中虽然实现了持久化,但是对于长期运行的 server 来说,持久化完整的 log 并不现实,因此日志压缩为 snapshot,对于 Raft 会丢弃快照之前的 log,可以大大减少持久化数据量并加快重启速度;
- 某个 follower 落后太多,导致 leader 已经丢弃了用于追加的 log,所以 leader 需要发送快照以及从该快照开始的 log;
- 既然会丢弃部分 log,那么这部分 log 的缺失会出现什么问题?
- 哪些节点需要压缩成 snapshot?
- 快照存储什么数据?快照是否需要持久化?
- 当 follower 发现自己的 log 已经落后于 leader,leader 如何通知 follower 使用快照进行替换?
日志截断 ¶
- 根据论文中的描述,当日志压缩时,需要进行日志截断,通俗来说就是只保留最后一个日志作为开头,而其他日志全部删除,但这样如果要恢复快照,就需要三个参数进行恢复:
snapshot:快照数据lastIncludedIndex: 日志中最高索引lastIncludedTerm: 日志中最高 Term
- 当存在日志截断后,Raft 节点中的各参数需要位置偏移,比如取
rf.log中的数据时,需要- lastIncludedIndex,而取rf.log的长度时,需要加+ lastIncludedIndex,既然如此,声明两个函数来操作:
// 使用 log 切片时使用的索引
func (rf *Raft) RealLogIndex(virtualLogIndex int) int {
return virtualLogIndex - rf.lastIncludedIndex
}
// 全局真实递增索引
func (rf *Raft) VirtualLogIndex(realLogIndex int) int {
return realLogIndex + rf.lastIncludedIndex
}
Snapshot()¶
接收应用层的快照请求,并截断 log 数组;
- 判断是否接受
snapshot; - 保存
snapshot; - 更新
lastIncludedIndex和lastIncludedTerm;
持久化 ¶
- 新增三个持久化内容,
lastIncludedIndex、lastIncludedTerm和snapshot;
InstallSnapshot RPC¶
当某个某个节点落后了
nextIndex - 1 < lastIncludedIndex,需要发送快照;
发送方 ¶
- 当 leader 发现 follower 要求回退的日志已经被截断时,触发 InstallSnapshot;
- InstallSnapshot 触发时机:
- leader 发送心跳之前判断 follower 是否落后于快照;
- leader 发送心跳后,如果发现日志冲突了,更改
rf.nextIndex时再判断 follower 是否落后于快照; - 只要
nextIndex < lastIncludedIndex,就要触发;
- 触发成功后,要重新设置
nextIndex和matchIndex; - 为了防止频繁触发 InstallSnapshot,可以先设置
rf.nextIndex,而不立刻go InstallSnapshot,交给下次心跳来触发;
接收方 ¶
- 如果
args.LastIncludedIndex处存在日志,那么需要日志截断并创建快照; - 如果
args.LastIncludedIndex处不存在日志,那么需要清空日志; - 更新
lastApplied和commitIndex; - 完成操作后,需要封装 ApplyMsg 并通过 applyCh 发送给应用层;
数组越界
- 由于加入了日志截断,所以有
RealLogIndex和VirtualLogIndex,还有lastIncludedIndex和 0 的区别,很可能导致书逐月节; - 例如:
- 新 leader 没有进行
nextIndex和matchIndex初始化 - 快速回退中
XIndex > lastIncludedIndex而不是 0;
- 新 leader 没有进行