MIT 6.824 Lab2 Raft Part A

Raft code implementation in Golang

Project Overview

Implement Raft leader election and heartbeats (AppendEntries RPCs with no log entries).

Objects

The paper provides a clear introduction to data structures in Figure 2. Just need to copy and paste them.

Raft

The properties in Raft are:

  • metadata
  • states
  • locks
  • periodic behavior supports
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// state
	// just what it thinks it is
	state int

	// persistent state on all servers
	currentTerm int
	votedFor    int // set to -1 at beginning
	log         []LogEntry

	// volatile state on all servers
	commitIndex int
	lastApplied int

	// volatile state on leaders
	nextIndex  []int
	matchIndex []int

	// each electionTimeout is different, so it is not saved here

	heartbeatInterval      time.Duration
	resetElectionTimerChan chan struct{}
	done                   chan struct{}
}

type LogEntry struct {
	Term    int
	Command interface{}
}

Args and Reply of RPC

Define what the RPC needs.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type RequestVoteArgs struct {
	Term         int
	CandidateId  int
	LastLogIndex int
	LastLogTerm  int
}

type RequestVoteReply struct {
	Term        int
	VoteGranted bool
}


type AppendEntriesArgs struct {
	Term         int
	LeaderId     int
	PrevLogIndex int
	PrevLogTerm  int
	Entries      []LogEntry
	LeaderCommit int
}

type AppendEntriesReply struct {
	Term    int
	Success bool
}

Highlights of The Code Implemention

Start of Raft

  1. construct the object
  2. start election goroutine
  3. start heartbeat goroutine
  4. return

Note that Make() method should return quickly, so we should use goroutines inside.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	// Your initialization code here (2A, 2B, 2C).
	rf := &Raft{
		peers:                  peers,
		persister:              persister,
		me:                     me,
		state:                  FOLLOWER,
		currentTerm:            0,
		votedFor:               -1,
		log:                    make([]LogEntry, 0),
		commitIndex:            0,
		lastApplied:            0,
		nextIndex:              make([]int, len(peers)),
		matchIndex:             make([]int, len(peers)),
		heartbeatInterval:      200 * time.Millisecond, // the tester limits you to 10 heartbeats per second
		resetElectionTimerChan: make(chan struct{}, 1),
		done:                   make(chan struct{}),
	}

	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())

	// start background goroutines
	go rf.electionTimerGoroutine()
	go rf.heartbeatGoroutine()

	return rf
}

Timing

Use time.Sleep() in a for loop.

1
2
3
4
5
6
	for !rf.killed() {
		electionTimeout := time.Duration(300+rand.Intn(150)) * time.Millisecond
		time.Sleep(electionTimeout)
        // do periodic things
		}
	}

Starting Election

  1. add 1 to current term
  2. vote for self
  3. prepare RPC args and replies
  4. traverse all the peers and send RPC
  5. count votes

Pay attention to state changes.

If the code is not in a mutex’s synchronization, the state may change any time. For instance, if the reply.Term > rf.currentTerm, this raft instance will no longer be a candidate. However, the other goroutines do not know this. So when you try to do something, the first thing is rf.mu.Lock() and check the state.

Receiving RPCs

RequestVote RPC

  1. lock
  2. check term
  3. log up-to-date check
  4. if grant vote, reset election timeout
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (2A, 2B).

	rf.mu.Lock()
	defer rf.mu.Unlock()

	if args.Term > rf.currentTerm {
		rf.currentTerm = args.Term
		rf.votedFor = -1
		rf.state = FOLLOWER
	}

	reply.Term = rf.currentTerm

	if args.Term < rf.currentTerm {
		reply.VoteGranted = false
		return
	}

	// up-to-date check
	lastLogIndex := 0
	lastLogTerm := 0
	if len(rf.log) > 0 {
		lastLogIndex = len(rf.log)
		lastLogTerm = rf.log[lastLogIndex-1].Term
	}
	logUpToDate := args.LastLogTerm > lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)

	if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && logUpToDate {
		reply.VoteGranted = true
		rf.votedFor = args.CandidateId
		// reset election timer
		select {
		case rf.resetElectionTimerChan <- struct{}{}:
		default:
		}
	} else {
		reply.VoteGranted = false
	}
}

AppendEntries RPC

  1. lock
  2. term check
  3. reset election timeout
  4. log consistency check
  5. todo for lab2b: append new entries
  6. update commit index
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	if args.Term < rf.currentTerm {
		reply.Term = rf.currentTerm
		reply.Success = false
		return
	}

	if args.Term > rf.currentTerm {
		rf.currentTerm = args.Term
		rf.votedFor = -1
	}

	// Note that it is impossible to exist two leaders that are in the same term.
	rf.state = FOLLOWER

	select {
	case rf.resetElectionTimerChan <- struct{}{}:
	default:
	}

	// log consistency check, check the last index and term.
	if args.PrevLogIndex > 0 {
		if len(rf.log) < args.PrevLogTerm {
			reply.Success = false
			return
		}
		if rf.log[args.PrevLogIndex-1].Term != args.PrevLogTerm {
			reply.Success = false
			rf.log = rf.log[:args.PrevLogIndex]
			return
		}
	}

	// TODO append new entries

	// update commit index
	if args.LeaderCommit > rf.commitIndex {
		rf.commitIndex = min(args.LeaderCommit, len(rf.log))
	}

	reply.Term = rf.currentTerm
	reply.Success = true
}

Test

Passed.

1
2
3
4
5
6
7
$ go test -run 2A
Test (2A): initial election ...
  ... Passed --   3.0  3   30    7594    0
Test (2A): election after network failure ...
  ... Passed --   4.5  3   80   14810    0
PASS
ok  	github.com/alioth4j/6.824/src/raft	7.493s

Conclusion

Thanks Paper

Most part of lab2a can map to the raft paper directly, especially Figure 2.

Everything Lies in State

  • The structs store states.
  • Synchronization protects shared states.
  • Check the state first and do the corresponding thing.
  • RPC exchanges states of different raft instances.
  • Compare states and change the current state.
Licensed under CC BY-NC-SA 4.0
Who comes from mountains, rivers, lakes and seas, yet is confined to days, nights, kitchen and love?