MIT 6.824 Lab2 Raft Part B

Raft code implementation in Golang

Project Overview

Implement the leader and follower code to append new log entries

Logic

  1. The leader need to append new log entries (indeed what the followers don’t have) to followers. Meanwhile, the leader should maintain states.
  2. The follower do things when receiving an AppendEntries RPC.
  3. Apply commands to state machine in Make method.

Implementation

Leader

Let’s start from Start method.

The user calls Start method attempting to append a new command to raft cluster. Note that only Leader are able to process user’s call, so return false if the server is not Leader.

Then the Leader do things to start the agreement:

  1. construct LogEntry object
  2. append new entry to self’s log
  3. broadcast AppendEntries RPC to other servers (use goroutine as Start method should return immediately)
  4. return
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (rf *Raft) Start(command interface{}) (int, int, bool) {
	// Your code here (2B).
	rf.mu.Lock()
	defer rf.mu.Unlock()

	// check whether this server is the leader
	// if not, return false.
	if rf.state != LEADER {
		return -1, -1, false
	}

	// --- start the agreement ---
	// construct LogEntry
	entry := LogEntry{
		Term:    rf.currentTerm,
		Command: command,
	}

	// append to self's log
	rf.log = append(rf.log, entry)

	index := len(rf.log)
	term := rf.currentTerm

	// use goroutine to sync to others, as this method should return immediately
	go rf.broadcastAppendEntries()

	return index, term, true
}

Let’s dive into broadcastAppendEntries method.

The first thing is traversing peers.

1
2
3
4
5
6
for i := range rf.peers {
		if i == rf.me {
			continue
		}
        // ..
}

Then send AppendEntries RPC.

  • prepare RPC args
  • rf.sendAppendEntries(server, args, reply)
  • handle reply: maintain states, including matchIndex, nextIndex and commitIndex

Note that the timing we update commit index is when receiving a success reply (reply.Success == true).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
		go func(server int) {
			rf.mu.Lock()
			if rf.state != LEADER {
				rf.mu.Unlock()
				return
			}

			// next index
			nextIndex := rf.nextIndex[server]
			if nextIndex <= 0 {
				nextIndex = 1
			}
			// prevLogIndex and prevLogTerm
			prevLogIndex := nextIndex - 1
			prevLogTerm := 0
			if prevLogIndex > 0 {
				prevLogTerm = rf.log[prevLogIndex-1].Term
			}
			// sync entries
			var entries []LogEntry
			if nextIndex <= len(rf.log) {
				entries = rf.log[nextIndex-1:]
			} else {
				entries = nil
			}
			args := &AppendEntriesArgs{
				Term:         term,
				LeaderId:     rf.me,
				PrevLogIndex: prevLogIndex,
				PrevLogTerm:  prevLogTerm,
				Entries:      entries,
				LeaderCommit: leaderCommit,
			}
			rf.mu.Unlock()

			reply := &AppendEntriesReply{}
			ok := rf.sendAppendEntries(server, args, reply)
			if !ok {
				return
			}

			rf.mu.Lock()
			defer rf.mu.Unlock()
			if rf.state != LEADER || rf.currentTerm != term {
				return
			}
			if reply.Term > rf.currentTerm {
				rf.currentTerm = reply.Term
				rf.state = FOLLOWER
				rf.votedFor = -1
				return
			}
			if reply.Success {
				rf.matchIndex[server] = prevLogIndex + len(entries)
				rf.nextIndex[server] = rf.matchIndex[server] + 1
				rf.updateCommitIndex()
			} else {
				if rf.nextIndex[server] > 1 {
					rf.nextIndex[server]--
				}
			}
		}(i)

When updating commit index, we use reverse order (from the last index to the commitIndex), because the commitment of later index can promise the commitment of previous indexes.

Note that the leader can only commit entries in self’s term, so when encountering a different term entry, we have to continue.

To every log index, we will check whether it is in the majority of servers. If yes, we can update our commitIndex.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (rf *Raft) updateCommitIndex() {
	for N := len(rf.log); N > rf.commitIndex; N-- {
		// the leader can only commit entries in self's term
		if rf.log[N-1].Term != rf.currentTerm {
			continue
		}

		count := 1
		for i := range rf.peers {
			if i != rf.me && rf.matchIndex[i] >= N {
				count++
			}
		}
		if count > len(rf.peers)/2 {
			rf.commitIndex = N
			break
		}
	}
}

Follower

We have a TODO mark in code of Lab 2A. Now we will complete that.

Firstly, check log consistency. PrevLogIndex and PrevLogTermare used here.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
	if args.PrevLogIndex > 0 {
		if len(rf.log) < args.PrevLogIndex {
			reply.Success = false
			return
		}
		if rf.log[args.PrevLogIndex-1].Term != args.PrevLogTerm {
			reply.Success = false
			rf.log = rf.log[:args.PrevLogIndex]
			return
		}
	}

After that, handle conflicts. If the log term encounters a conflict, this entry and latter entries will be deleted directly.

1
2
if rf.log[logIndex-1].Term != entry.Term {
    rf.log = rf.log[:logIndex-1]

Finally, append new log entries.

1
2
rf.log = append(rf.log, args.Entries[i:]...)
    break

Make method

We need to apply commands to state machine. We implement this by sending a ApplyMsg object to applyCh channel.

The commands need to be executed are from rf.lastApplied + 1 to rf.commitIndex.

These things are supporsed to do periodically, so we place the code in a for loop.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
	// apply commands to state machine
	go func() {
		for !rf.killed() {
			rf.mu.Lock()
			if rf.lastApplied < rf.commitIndex {
				for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
					msg := ApplyMsg{
						CommandValid: true,
						Command:      rf.log[i-1].Command,
						CommandIndex: i,
					}
					applyCh <- msg
				}
				rf.lastApplied = rf.commitIndex
			}
			rf.mu.Unlock()
			time.Sleep(10 * time.Millisecond)
		}
	}()

Test

Passed.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
$ go test -run 2B
Test (2B): basic agreement ...
  ... Passed --   0.6  3   16    4140    3
Test (2B): RPC byte count ...
  ... Passed --   1.4  3   48  113112   11
Test (2B): agreement despite follower disconnection ...
  ... Passed --   4.1  3   98   24735    7
Test (2B): no agreement if too many followers disconnect ...
  ... Passed --   3.4  5  208   42858    4
Test (2B): concurrent Start()s ...
  ... Passed --   0.5  3   18    4933    6
Test (2B): rejoin of partitioned leader ...
  ... Passed --   4.1  3  152   35343    4
Test (2B): leader backs up quickly over incorrect follower logs ...
  ... Passed --  23.3  5 2590 2101590  104
Test (2B): RPC counts aren't too high ...
  ... Passed --   2.0  3   60   16764   12
PASS
ok  	github.com/alioth4j/6.824/src/raft	39.506s

Conclusion

Syncronization

  • State checking needs synchronization.
  • We use lots of goroutines. Pay attention to that the outer synchronization can not sync goroutines. So another synchronization is needed in goroutine’s method. Between unlocking and locking, the state may change, so we have to check the state again after acquiring the lock.

What Should We Do?

I think this is one of the most challenging things in this lab. These things are seperated in the paper and we have to think carefully when to do something and what we to do.

The maintainance of state is the breaking point.

Licensed under CC BY-NC-SA 4.0
Who comes from mountains, rivers, lakes and seas, yet is confined to days, nights, kitchen and love?