Project Overview
Implement Raft leader election and heartbeats (AppendEntries RPCs with no log entries).
Objects
The paper provides a clear introduction to data structures in Figure 2. Just need to copy and paste them.
Raft
The properties in Raft are:
- metadata
- states
- locks
- periodic behavior supports
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// state
// just what it thinks it is
state int
// persistent state on all servers
currentTerm int
votedFor int // set to -1 at beginning
log []LogEntry
// volatile state on all servers
commitIndex int
lastApplied int
// volatile state on leaders
nextIndex []int
matchIndex []int
// each electionTimeout is different, so it is not saved here
heartbeatInterval time.Duration
resetElectionTimerChan chan struct{}
done chan struct{}
}
type LogEntry struct {
Term int
Command interface{}
}
|
Args and Reply of RPC
Define what the RPC needs.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| type RequestVoteArgs struct {
Term int
CandidateId int
LastLogIndex int
LastLogTerm int
}
type RequestVoteReply struct {
Term int
VoteGranted bool
}
type AppendEntriesArgs struct {
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry
LeaderCommit int
}
type AppendEntriesReply struct {
Term int
Success bool
}
|
Highlights of The Code Implemention
Start of Raft
- construct the object
- start election goroutine
- start heartbeat goroutine
- return
Note that Make()
method should return quickly, so we should use goroutines inside.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
// Your initialization code here (2A, 2B, 2C).
rf := &Raft{
peers: peers,
persister: persister,
me: me,
state: FOLLOWER,
currentTerm: 0,
votedFor: -1,
log: make([]LogEntry, 0),
commitIndex: 0,
lastApplied: 0,
nextIndex: make([]int, len(peers)),
matchIndex: make([]int, len(peers)),
heartbeatInterval: 200 * time.Millisecond, // the tester limits you to 10 heartbeats per second
resetElectionTimerChan: make(chan struct{}, 1),
done: make(chan struct{}),
}
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
// start background goroutines
go rf.electionTimerGoroutine()
go rf.heartbeatGoroutine()
return rf
}
|
Timing
Use time.Sleep()
in a for
loop.
1
2
3
4
5
6
| for !rf.killed() {
electionTimeout := time.Duration(300+rand.Intn(150)) * time.Millisecond
time.Sleep(electionTimeout)
// do periodic things
}
}
|
Starting Election
- add 1 to current term
- vote for self
- prepare RPC args and replies
- traverse all the peers and send RPC
- count votes
Pay attention to state changes.
If the code is not in a mutex’s synchronization, the state may change any time. For instance, if the reply.Term > rf.currentTerm
, this raft instance will no longer be a candidate. However, the other goroutines do not know this. So when you try to do something, the first thing is rf.mu.Lock()
and check the state.
Receiving RPCs
RequestVote RPC
- lock
- check term
- log up-to-date check
- if grant vote, reset election timeout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.state = FOLLOWER
}
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm {
reply.VoteGranted = false
return
}
// up-to-date check
lastLogIndex := 0
lastLogTerm := 0
if len(rf.log) > 0 {
lastLogIndex = len(rf.log)
lastLogTerm = rf.log[lastLogIndex-1].Term
}
logUpToDate := args.LastLogTerm > lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)
if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && logUpToDate {
reply.VoteGranted = true
rf.votedFor = args.CandidateId
// reset election timer
select {
case rf.resetElectionTimerChan <- struct{}{}:
default:
}
} else {
reply.VoteGranted = false
}
}
|
AppendEntries RPC
- lock
- term check
- reset election timeout
- log consistency check
- todo for lab2b: append new entries
- update commit index
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.Success = false
return
}
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
}
// Note that it is impossible to exist two leaders that are in the same term.
rf.state = FOLLOWER
select {
case rf.resetElectionTimerChan <- struct{}{}:
default:
}
// log consistency check, check the last index and term.
if args.PrevLogIndex > 0 {
if len(rf.log) < args.PrevLogTerm {
reply.Success = false
return
}
if rf.log[args.PrevLogIndex-1].Term != args.PrevLogTerm {
reply.Success = false
rf.log = rf.log[:args.PrevLogIndex]
return
}
}
// TODO append new entries
// update commit index
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = min(args.LeaderCommit, len(rf.log))
}
reply.Term = rf.currentTerm
reply.Success = true
}
|
Test
Passed.
1
2
3
4
5
6
7
| $ go test -run 2A
Test (2A): initial election ...
... Passed -- 3.0 3 30 7594 0
Test (2A): election after network failure ...
... Passed -- 4.5 3 80 14810 0
PASS
ok github.com/alioth4j/6.824/src/raft 7.493s
|
Conclusion
Thanks Paper
Most part of lab2a can map to the raft paper directly, especially Figure 2.
Everything Lies in State
- The
struct
s store states. - Synchronization protects shared states.
- Check the state first and do the corresponding thing.
- RPC exchanges states of different raft instances.
- Compare states and change the current state.