Project Overview
Implement the leader and follower code to append new log entries
Logic
- The leader need to append new log entries (indeed what the followers don’t have) to followers. Meanwhile, the leader should maintain states.
- The follower do things when receiving an
AppendEntries
RPC. - Apply commands to state machine in
Make
method.
Implementation
Leader
Let’s start from Start
method.
The user calls Start
method attempting to append a new command to raft cluster. Note that only Leader are able to process user’s call, so return false
if the server is not Leader.
Then the Leader do things to start the agreement:
- construct
LogEntry
object - append new entry to self’s log
- broadcast
AppendEntries
RPC to other servers (use goroutine as Start
method should return immediately) - return
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| func (rf *Raft) Start(command interface{}) (int, int, bool) {
// Your code here (2B).
rf.mu.Lock()
defer rf.mu.Unlock()
// check whether this server is the leader
// if not, return false.
if rf.state != LEADER {
return -1, -1, false
}
// --- start the agreement ---
// construct LogEntry
entry := LogEntry{
Term: rf.currentTerm,
Command: command,
}
// append to self's log
rf.log = append(rf.log, entry)
index := len(rf.log)
term := rf.currentTerm
// use goroutine to sync to others, as this method should return immediately
go rf.broadcastAppendEntries()
return index, term, true
}
|
Let’s dive into broadcastAppendEntries
method.
The first thing is traversing peers.
1
2
3
4
5
6
| for i := range rf.peers {
if i == rf.me {
continue
}
// ..
}
|
Then send AppendEntries
RPC.
- prepare RPC args
rf.sendAppendEntries(server, args, reply)
- handle reply: maintain states, including
matchIndex
, nextIndex
and commitIndex
Note that the timing we update commit index is when receiving a success reply (reply.Success == true
).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
| go func(server int) {
rf.mu.Lock()
if rf.state != LEADER {
rf.mu.Unlock()
return
}
// next index
nextIndex := rf.nextIndex[server]
if nextIndex <= 0 {
nextIndex = 1
}
// prevLogIndex and prevLogTerm
prevLogIndex := nextIndex - 1
prevLogTerm := 0
if prevLogIndex > 0 {
prevLogTerm = rf.log[prevLogIndex-1].Term
}
// sync entries
var entries []LogEntry
if nextIndex <= len(rf.log) {
entries = rf.log[nextIndex-1:]
} else {
entries = nil
}
args := &AppendEntriesArgs{
Term: term,
LeaderId: rf.me,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: entries,
LeaderCommit: leaderCommit,
}
rf.mu.Unlock()
reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(server, args, reply)
if !ok {
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.state != LEADER || rf.currentTerm != term {
return
}
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.state = FOLLOWER
rf.votedFor = -1
return
}
if reply.Success {
rf.matchIndex[server] = prevLogIndex + len(entries)
rf.nextIndex[server] = rf.matchIndex[server] + 1
rf.updateCommitIndex()
} else {
if rf.nextIndex[server] > 1 {
rf.nextIndex[server]--
}
}
}(i)
|
When updating commit index, we use reverse order (from the last index to the commitIndex
), because the commitment of later index can promise the commitment of previous indexes.
Note that the leader can only commit entries in self’s term, so when encountering a different term entry, we have to continue.
To every log index, we will check whether it is in the majority of servers. If yes, we can update our commitIndex
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| func (rf *Raft) updateCommitIndex() {
for N := len(rf.log); N > rf.commitIndex; N-- {
// the leader can only commit entries in self's term
if rf.log[N-1].Term != rf.currentTerm {
continue
}
count := 1
for i := range rf.peers {
if i != rf.me && rf.matchIndex[i] >= N {
count++
}
}
if count > len(rf.peers)/2 {
rf.commitIndex = N
break
}
}
}
|
Follower
We have a TODO
mark in code of Lab 2A. Now we will complete that.
Firstly, check log consistency. PrevLogIndex
and PrevLogTerm
are used here.
1
2
3
4
5
6
7
8
9
10
11
| if args.PrevLogIndex > 0 {
if len(rf.log) < args.PrevLogIndex {
reply.Success = false
return
}
if rf.log[args.PrevLogIndex-1].Term != args.PrevLogTerm {
reply.Success = false
rf.log = rf.log[:args.PrevLogIndex]
return
}
}
|
After that, handle conflicts. If the log term encounters a conflict, this entry and latter entries will be deleted directly.
1
2
| if rf.log[logIndex-1].Term != entry.Term {
rf.log = rf.log[:logIndex-1]
|
Finally, append new log entries.
1
2
| rf.log = append(rf.log, args.Entries[i:]...)
break
|
Make
method
We need to apply commands to state machine. We implement this by sending a ApplyMsg
object to applyCh
channel.
The commands need to be executed are from rf.lastApplied + 1
to rf.commitIndex
.
These things are supporsed to do periodically, so we place the code in a for
loop.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| // apply commands to state machine
go func() {
for !rf.killed() {
rf.mu.Lock()
if rf.lastApplied < rf.commitIndex {
for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
msg := ApplyMsg{
CommandValid: true,
Command: rf.log[i-1].Command,
CommandIndex: i,
}
applyCh <- msg
}
rf.lastApplied = rf.commitIndex
}
rf.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
}()
|
Test
Passed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| $ go test -run 2B
Test (2B): basic agreement ...
... Passed -- 0.6 3 16 4140 3
Test (2B): RPC byte count ...
... Passed -- 1.4 3 48 113112 11
Test (2B): agreement despite follower disconnection ...
... Passed -- 4.1 3 98 24735 7
Test (2B): no agreement if too many followers disconnect ...
... Passed -- 3.4 5 208 42858 4
Test (2B): concurrent Start()s ...
... Passed -- 0.5 3 18 4933 6
Test (2B): rejoin of partitioned leader ...
... Passed -- 4.1 3 152 35343 4
Test (2B): leader backs up quickly over incorrect follower logs ...
... Passed -- 23.3 5 2590 2101590 104
Test (2B): RPC counts aren't too high ...
... Passed -- 2.0 3 60 16764 12
PASS
ok github.com/alioth4j/6.824/src/raft 39.506s
|
Conclusion
Syncronization
- State checking needs synchronization.
- We use lots of goroutines. Pay attention to that the outer synchronization can not sync goroutines. So another synchronization is needed in goroutine’s method. Between unlocking and locking, the state may change, so we have to check the state again after acquiring the lock.
What Should We Do?
I think this is one of the most challenging things in this lab. These things are seperated in the paper and we have to think carefully when to do something and what we to do.
The maintainance of state is the breaking point.