6.5840 Lab 3: Raft

· Technology

Lab3 Raft

Part A Leader Election

Raft集群中的每台机器有三种状态:FollowerCandidateLeader

Follower状态

  • 默认状态:每个节点初始化时都处于Follower状态。
  • 超时触发:Follower会等待来自Leader的心跳信号。如果超过选举超时时间(一般在150ms到300ms之间的随机时间)没有收到Leader的心跳或日志追加请求(AppendEntries RPC),Follower会转换为Candidate。

Candidate

  • 自我提名:节点在转换为Candidate后,会将自己的term加1,并投票给自己。
  • 发送投票请求:Candidate会向其他节点发送RequestVote RPC,请求投票支持。
    • Case 1:获得大多数节点的选票后,Candidate转换为Leader。
    • Case 2:如果没有得到足够票数(投票失败),或等待期间再次超时,则重新进入下一轮选举超时(electionTimeout)并再度发起选举。
    • Case 3:如果在等待过程中发现有其他节点当选为Leader(收到该Leader的AppendEntries请求),Candidate立即转换为Follower。

Leader

  • 心跳机制:当节点成为Leader后,会不断地向所有Follower发送空的AppendEntries RPC作为心跳信号。Follower在收到Leader的心跳信号后,会重置其选举超时时间。
  • 维护领导关系:心跳机制让Follower可以持续确认Leader的存在,一旦超时,将会重新进入选举流程。

主要是比较term的大小,暂时只用考虑到follow 大的term,后面还需要加入日志的判断。
如:收到RequestVote RPC,判断Candidate term是否大于等于当前的term,否:则让拒绝投票,小于的话就让其成为我们的follower,是:则继续判断是否已经投票,先到先得。

随机超时时间的作用

选举超时时间通常设置为150ms到300ms之间的随机数,以防止节点同时转换为Candidate并自我投票的情况。如果所有节点同时开始选举,它们会相互竞争投票,导致选举失败并进入新一轮超时循环。随机化的超时时间可以减少这种冲突的可能性,提高选举效率和稳定性。

这个机制使得Raft可以快速且高效地在集群中选出唯一的Leader,保证集群的一致性和稳定性。

仔细考虑状态的转化和检测,如收到了peer的投票结果,判断下当前的term有没有改变,当前的State还是不是Candidate的

PartB Log

主要添加log相关的逻辑

AppendEntries
reply:

	// Reply false if log doesn’t contain an entry at prevLogIndex
	// whose term matches prevLogTerm (§5.3)
	
	// 3. If an existing entry conflicts with a new one (same index
	// but different terms), delete the existing entry and all that
	// follow it (§5.3)
	// 4. Append any new entries not already in the log
	
	// 5. If leaderCommit > commitIndex, set commitIndex =
	// min(leaderCommit, index of last new entry)

send:

	// If last log index nextIndex for a follower: send
	// AppendEntries RPC with log entries starting at nextIndex
	// If successful: update nextIndex and matchIndex for
	// follower (§5.3)
	// If AppendEntries fails because of log inconsistency:
	// decrement nextIndex and retry (§5.3)
 
这里减小nextIndex时,注意判断term是否变了,如果没变则继续减小
 
 
	// If there exists an N such that N > commitIndex, a majority
	// of matchIndex[i] N, and log[N].term == currentTerm:
	// set commitIndex = N (§5.3, §5.4).
 
这里取大多数节点的matchIndex时可以对数组排序然后取中位数。

RequestVote
reply:

 
	// If votedFor is null or candidateId, and candidate’s log is at least as up-to-date as receiver’s log, grant vote
 
 
下面是up-to-date的判断逻辑
 
Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs.
 
> 1. If the logs have last entries with different terms, then the log with the later term is more up-to-date.
> 2. If the logs end with the same term, then whichever log is longer is more up-to-date

problem

下面的代码出现了data race
主要是这里entries传递了rf.log的切片引用,这个需要mutex保护,但是 requestReplicateToPeer会对rf.log做修改,那么当切片容量不足时,需要创建一个新的并copy,则旧的rf.log引用会有问题

	rf.mu.Lock()
	defer rf.mu.Unlock()
	...
 
	for peer := range rf.peers {
		if peer == rf.me {
			rf.matchIndex[peer] = len(rf.log) - 1
			rf.nextIndex[peer] = len(rf.log)
			continue
		}
 
		prevLogIndex := rf.nextIndex[peer] - 1
		prevLogTerm := rf.log[prevLogIndex].Term
 
		// Copy log entries before releasing the lock
		entries := make([]LogEntry, len(rf.log[prevLogIndex+1:]))
		copy(entries, rf.log[prevLogIndex+1:])
 
		args := &AppendEntriesArgs{
			Term:         savedCurrentTerm,
			LeaderId:     rf.me,
			PrevLogIndex: prevLogIndex,
			PrevLogTerm:  prevLogTerm,
			Entries:      rf.log[prevLogIndex+1:],
			Entries:      entries,
			LeaderCommit: rf.commitIndex,
		}
		Debug(dInfo, rf.me, rf.currentTerm, "requestReplicateToPeer %d args: %v", peer, args)
		go requestReplicateToPeer(peer, args)
	}
WARNING: DATA RACE
Write at 0x00c000657988 by goroutine 9675:
  runtime.slicecopy()
      /usr/lib/go-1.22/src/runtime/slice.go:325 +0x0
  6.5840/raft.(*Raft).AppendEntries()
      /home/ultraman/Desktop/workspace/MIT6.824/src/raft/raft_replication.go:52 +0x4b8
  runtime.call32()
      /usr/lib/go-1.22/src/runtime/asm_amd64.s:771 +0x42
  reflect.Value.Call()
      /usr/lib/go-1.22/src/reflect/value.go:380 +0xb5
  6.5840/labrpc.(*Service).dispatch()
      /home/ultraman/Desktop/workspace/MIT6.824/src/labrpc/labrpc.go:506 +0x524
  6.5840/labrpc.(*Server).dispatch()
      /home/ultraman/Desktop/workspace/MIT6.824/src/labrpc/labrpc.go:430 +0x24e
  6.5840/labrpc.(*Network).processReq.func1()
      /home/ultraman/Desktop/workspace/MIT6.824/src/labrpc/labrpc.go:240 +0x9c
 
Previous read at 0x00c000657988 by goroutine 9682:
  reflect.Value.Int()
      /usr/lib/go-1.22/src/reflect/value.go:1467 +0x99
  encoding/gob.encInt()
      /usr/lib/go-1.22/src/encoding/gob/encode.go:188 +0x73
  encoding/gob.(*Encoder).encodeStruct()
      /usr/lib/go-1.22/src/encoding/gob/encode.go:328 +0x58e
  encoding/gob.encOpFor.func4()
      /usr/lib/go-1.22/src/encoding/gob/encode.go:546 +0xcb
  encoding/gob.(*Encoder).encodeArray()
      /usr/lib/go-1.22/src/encoding/gob/encode.go:351 +0x893
  encoding/gob.encOpFor.func1()
      /usr/lib/go-1.22/src/encoding/gob/encode.go:516 +0x212
  encoding/gob.(*Encoder).encodeStruct()
      /usr/lib/go-1.22/src/encoding/gob/encode.go:328 +0x58e
  encoding/gob.(*Encoder).encode()
      /usr/lib/go-1.22/src/encoding/gob/encode.go:666 +0x2a8
  encoding/gob.(*Encoder).EncodeValue()
      /usr/lib/go-1.22/src/encoding/gob/encoder.go:251 +0x78a
  encoding/gob.(*Encoder).Encode()
      /usr/lib/go-1.22/src/encoding/gob/encoder.go:176 +0x57
  6.5840/labgob.(*LabEncoder).Encode()
      /home/ultraman/Desktop/workspace/MIT6.824/src/labgob/labgob.go:34 +0x5b
  6.5840/labrpc.(*ClientEnd).Call()
      /home/ultraman/Desktop/workspace/MIT6.824/src/labrpc/labrpc.go:93 +0x1e4
  6.5840/raft.(*Raft).sendAppendEntries()
      /home/ultraman/Desktop/workspace/MIT6.824/src/raft/raft_replication.go:70 +0x10e
  6.5840/raft.(*Raft).startReplication.func1()
      /home/ultraman/Desktop/workspace/MIT6.824/src/raft/raft_replication.go:88 +0x8e
  6.5840/raft.(*Raft).startReplication.gowrap2()
      /home/ultraman/Desktop/workspace/MIT6.824/src/raft/raft_replication.go:169 +0x4f
 
Goroutine 9675 (running) created at:
  6.5840/labrpc.(*Network).processReq()
      /home/ultraman/Desktop/workspace/MIT6.824/src/labrpc/labrpc.go:239 +0x28a
  6.5840/labrpc.MakeNetwork.func1.gowrap1()
      /home/ultraman/Desktop/workspace/MIT6.824/src/labrpc/labrpc.go:157 +0x9c
 
Goroutine 9682 (running) created at:
  6.5840/raft.(*Raft).startReplication()
      /home/ultraman/Desktop/workspace/MIT6.824/src/raft/raft_replication.go:169 +0x9f2
  6.5840/raft.(*Raft).startReplicationTimer()
      /home/ultraman/Desktop/workspace/MIT6.824/src/raft/raft_replication.go:76 +0x58
  6.5840/raft.(*Raft).becomeLeader.gowrap1()
      /home/ultraman/Desktop/workspace/MIT6.824/src/raft/raft.go:277 +0x44
==================

改成copy

		entries := make([]LogEntry, len(rf.log[prevLogIndex+1:]))
		copy(entries, rf.log[prevLogIndex+1:])
 
		args := &AppendEntriesArgs{
			Entries:      entries,
		}

PartC persistence

(raft) ultraman in ~/Desktop/workspace/MIT6.824/src/raft on lab3_2 ● λ VERBOSE=1 python dtest.py -p 8 -r 3C -n 100
Running with the race detector

┏━━━━━━┳━━━━━━━━┳━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Test ┃ Failed ┃ Total ┃          Time ┃
┡━━━━━━╇━━━━━━━━╇━━━━━━━╇━━━━━━━━━━━━━━━┩
│ 3C   │      0 │   100 │ 134.67 ± 4.86 │

持久化:只需要对currentTerm/votedFor/log 保存

之前回溯follower的nextIndex,是一个一个的回退,等待下个周期继续。

这里有优化方式:
核心逻辑是在AE response中加入 follower自身的log信息。

  ConflictIndex int
  ConflictTerm  int

如果follower日志过短,设置ConflictIndex为len(log),ConflictTerm设置个标识INVALID

否则,ConflictTerm为当前冲突的Term, ConflictIndex设置成此Term的第一个log的index

	// Reply false if log doesn’t contain an entry at prevLogIndex
	// whose term matches prevLogTerm (§5.3)
	if args.PrevLogIndex >= len(rf.log) {
		reply.ConfilictIndex = len(rf.log)
		reply.ConfilictTerm = InvalidTerm
		Debug(dWarn, rf.me, rf.currentTerm, "Current peer doesn't contain prevLogIndex(%d)", args.PrevLogIndex)
		return
	}
	if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
		reply.ConfilictTerm = rf.log[args.PrevLogIndex].Term
		reply.ConfilictIndex = rf.firstLogIndexInTerm(reply.ConfilictTerm)
		Debug(dWarn, rf.me, rf.currentTerm, "Current peer contain prevLogIndex(%d) log didn't match(%d)", args.PrevLogIndex, args.PrevLogTerm)
		return
	}

In practice, we doubt this optimization is necessary, since failures happen infrequently and it is unlikely that there will be many inconsistent entries.

You will probably need the optimization that backs up nextIndex by more than one entry at a time. Look at the [extended Raft paper](http://nil.csail.mit.edu/6.5840/2024/papers/raft-extended.pdf) starting at the bottom of page 7 and top of page 8 (marked by a gray line). The paper is vague about the details; you will need to fill in the gaps. One possibility is to have a rejection message include:
 
    XTerm:  term in the conflicting entry (if any)
    XIndex: index of first entry with that term (if any)
    XLen:   log length
 
Then the leader's logic can be something like:
 
  Case 1: leader doesn't have XTerm:
    nextIndex = XIndex
  Case 2: leader has XTerm:
    nextIndex = leader's last entry for XTerm
  Case 3: follower's log is too short:
    nextIndex = XLen

PartD log compaction

发送 InstallSnapshot

当领导者发现某个追随者的日志与自己有较大差异,即可以在发送AE时判断peer的PrevLogIndex < snapLastIdx

主要还是注意每次获得锁后,判断状态有没有改变如当前term,role

Comments (0)

    Send comment

    Markdown supported. Please keep comments clean.