Project Overview
We are building a MapReduce system.
We are implementing a worker process that calls application Map and Reduce functions and handles reading and writing files, and a master process that hands out tasks to workers and copes with failed workers.
What Already Have
- mrmaster.go: the entrance of master program, call master later
- mrworker.go: the entrance of worker program, call worker later
- the plugin: contains Map function and Reduce function
Objects
use struct
in go.
- Master
- Worker
- rpc Args and Reply
We can define fields and methods later.
Process
In this distributed system, one master and at least one worker are running in parallel.
In Master
line, construct the master, then let master provide rpc service.
In Worker
line, the worker works continuously until everything is done.
The worker ask master for a task through rpc.
Master determines the stage of itself. Then traverse over the tasks, determine the status of the tasks, and send uncompleted or timeout task to worker.
The worker receives the work, determines its type, hand it to the corresponding Map or Reduce function. After completion, send the result to Master.
Master saves the result, shuffles and sorts.
When all the tasks are done, master changes its stage, exits the program. Workers receive a Done
signal after asking a task exit the program.
Highlights of The Code Implemention
Two(Multi) lines in parallel
One master and at least one worker are running in parallel. The entrances of them are mrmaster.go
and mrworker.go
. In main
method in mrmaster.go
, we invoke mr.MakeMaster
method which is defined in master.go
to construct the master. In main
method in mrworker.go
, load plugin first, then invoke mr.Worker
method, which is defined in worker.go
, to run the worker.
Where to Store the Status
In Master
. The status are stored as type Master struct
’s properties.
The status includes:
1
2
3
4
5
6
7
8
9
10
11
12
| Files []string
NReduce int
Stage int // 0 -> Map, 1 -> Reduce, 2 -> Done
MapStatus []int // 0 -> waiting, 1 -> executing, 2 -> done
MapStartTime []time.Time
CompletedMapTaskCount int
ReduceStatus []int // 0 -> waiting, 1 -> executing, 2 -> done
ReduceStartTime []time.Time
CompletedReduceTaskCount int
ReduceWorks [][]string
|
Besides, need a lock:
Worker works continuously
Use a while loop in Worker()
method, until receives a Down
flag and break
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
// ...
switch task.Tpe {
// ...
case 2:
// Done
break
// ...
}
}
}
|
The RPC
Note that we pass the pointer to the function.
Encapsulate to make it easier:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| // send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args *Args, reply *Reply) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := masterSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
|
How to determine the task type
Use a int flag in Reply
struct to distinguish Map
, Reduce
, Done
and Wait
types of the task.
1
2
3
4
5
6
| // This is the task.
type Reply struct {
// ...
Tpe int // 0 -> Map, 1 -> Reduce, 2 -> Done, 3 -> Wait
// ...
}
|
Work Distribution Implementation
As said before, master holds the status. Master saved the status and the start time of the task in slices. Note that the indexes of them are correspond. When receiving a AskForWork
call, master traverse all the tasks(Map or Reduce), judge the status of it. If the status is done
, just continue; if the status is waiting
, hand it to the worker; if the status is executing
, judge whether it is timeout.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| // some code omitted
func (m *Master) AskForWork(args *Args, task *Reply) error {
m.lock.Lock()
defer m.lock.Unlock()
switch m.Stage {
case 0:
// Map
for i, status := range m.MapStatus {
if status == 0 {
} else if status == 1 {
} else if status == 2 {
}
}
// wait
task.Tpe = 3
case 1:
case 2:
}
return nil
}
|
Fault-tolerance Design
Master also saves start-executing time of each task. When traversing in AskForWork
method, judge whether a task is timeout. If yes, hand it to the current worker.
1
2
3
4
5
6
7
8
9
10
11
| } else if status == 1 {
if time.Now().After(m.MapStartTime[i].Add(10 * time.Second)) {
task.Id = i
task.Tpe = 0
task.Filename = m.Files[i]
m.MapStatus[i] = 1
m.MapStartTime[i] = time.Now()
return nil
} else {
continue
}
|
One possible sitution:
If a worker fails to communicate with the master, the work will be send to another worker. In this condition two workers are executing the same work at the same time, and they will both send the result to master after completion.
This is because the master cannot ask the problematic worker to stop executing (they cannot communicate with each other).
So we will handle this when receiving results.
In FinishXxx
method, We use double-check lock to make sure the same result will not be saved twice.
1
2
3
4
5
6
7
8
9
| func (m *Master) FinishMap(args *Args, reply *Reply) error {
if m.MapStatus[args.Id] == 1 {
m.lock.Lock()
if m.MapStatus[args.Id] == 1 {
// ...
}
}
}
|
What Does Worker Do When Receiving a Task
Map task:
Get the content of the file through filename, then invoke mapf
to execute the real Map. At last use rpc to send the result.
1
2
3
4
5
6
7
| case 0:
// Map
content := getContentThroughFilename(task.Filename)
kvs := mapf(task.Filename, content)
args := Args{task.Id, kvs}
reply := Reply{}
call("Master.FinishMap", &args, &reply)
|
Reduce task:
Traverse the filenames, put each key-value of each file to a HashMap. After this done, traverse the HashMap, invoke reducef
to each key-value. At last use rpc to send the result.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| case 1:
// Reduce
filenames := task.Filenames
hashMap := make(map[string][]string)
for _, filename := range filenames {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("Cannot open %v", filename)
}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
kv := strings.Split(line, " ")
key := kv[0]
value := kv[1]
hashMap[key] = append(hashMap[key], value)
}
}
var kvs []KeyValue
for key, parts := range hashMap {
value := reducef(key, parts)
kv := KeyValue{key, value}
kvs = append(kvs, kv)
}
args := Args{task.Id, kvs}
reply := Reply{}
call("Master.FinishReduce", &args, &reply)
|
What Does Master Do When Receiving a Result
Map task:
Save the result to a intermediate file. Save the task in m.ReduceWorks[y]
.
The hash operation is hash(key) % nReduce
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| if m.MapStatus[args.Id] == 1 {
// store to "mr-x-y" file
groups := make(map[int][]KeyValue)
for _, kv := range args.Kvs {
y := ihash(kv.Key) % m.NReduce
groups[y] = append(groups[y], kv)
}
for y, kvs := range groups {
filename := "mr-" + strconv.Itoa(args.Id) + "-" + strconv.Itoa(y)
// about the first param, can't use "" while using Silverblue
tempFile, err := ioutil.TempFile(".", "temp-")
if err != nil {
return err
}
for _, kv := range kvs {
tempFile.WriteString(kv.Key + " " + kv.Value + "\n")
}
tempFile.Close()
os.Rename(tempFile.Name(), filename)
m.ReduceWorks[y] = append(m.ReduceWorks[y], filename)
}
// change the status of the work
m.MapStatus[args.Id] = 2
m.CompletedMapTaskCount++
// change the stage of the master
if m.CompletedMapTaskCount == len(m.Files) {
m.Stage = 1
}
|
Reduce task:
Received []KeyValue
, use a HashMap to sort by key. At last write to the final file.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| if m.ReduceStatus[args.Id] == 1 {
hashMap := make(map[string]string)
var arrayList []string
for _, kv := range args.Kvs {
hashMap[kv.Key] = kv.Value
arrayList = append(arrayList, kv.Key)
}
sort.Strings(arrayList)
filename := "mr-out-" + strconv.Itoa(args.Id)
file, err := os.Create(filename)
if err != nil {
return err
}
defer file.Close()
for _, key := range arrayList {
fmt.Fprintf(file, "%v %v\n", key, hashMap[key])
}
// change the status of the work
m.ReduceStatus[args.Id] = 2
m.CompletedReduceTaskCount++
// change the stage of Master
if m.CompletedReduceTaskCount == m.NReduce {
m.Stage = 2
}
}
|
Test
Passed all tests:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| $ sudo sh test-mr.sh
*** Starting wc test.
read unix @->/var/tmp/824-mr-0: read: connection reset by peer
2025/02/23 22:42:29 dialing:dial-http unix /var/tmp/824-mr-0: unexpected EOF
2025/02/23 22:42:29 cannot open
2025/02/23 22:42:29 dialing:dial-http unix /var/tmp/824-mr-0: read unix @->/var/tmp/824-mr-0: read: connection reset by peer
--- wc test: PASS
*** Starting indexer test.
2025/02/23 22:42:32 dialing:dial-http unix /var/tmp/824-mr-0: read unix @->/var/tmp/824-mr-0: read: connection reset by peer
2025/02/23 22:42:32 dialing:dial-http unix /var/tmp/824-mr-0: read unix @->/var/tmp/824-mr-0: read: connection reset by peer
--- indexer test: PASS
*** Starting map parallelism test.
2025/02/23 22:42:39 dialing:dial-http unix /var/tmp/824-mr-0: read unix @->/var/tmp/824-mr-0: read: connection reset by peer
unexpected EOF
2025/02/23 22:42:39 cannot open
--- map parallelism test: PASS
*** Starting reduce parallelism test.
2025/02/23 22:42:45 cannot open
--- reduce parallelism test: PASS
2025/02/23 22:42:48 dialing:dial-http unix /var/tmp/824-mr-0: unexpected EOF
*** Starting crash test.
2025/02/23 22:43:13 cannot open
2025/02/23 22:43:14 cannot open
2025/02/23 22:43:14 cannot open
2025/02/23 22:43:15 cannot open
2025/02/23 22:43:15 cannot open
2025/02/23 22:43:16 cannot open
2025/02/23 22:43:16 cannot open
2025/02/23 22:43:16 cannot open
2025/02/23 22:43:17 cannot open
2025/02/23 22:43:17 cannot open
2025/02/23 22:43:17 cannot open
2025/02/23 22:43:18 cannot open
2025/02/23 22:43:18 cannot open
2025/02/23 22:43:18 cannot open
2025/02/23 22:43:19 cannot open
2025/02/23 22:43:19 cannot open
2025/02/23 22:43:19 cannot open
2025/02/23 22:43:20 cannot open
2025/02/23 22:43:20 cannot open
2025/02/23 22:43:20 cannot open
2025/02/23 22:43:21 cannot open
2025/02/23 22:43:21 cannot open
2025/02/23 22:43:21 cannot open
2025/02/23 22:43:22 cannot open
2025/02/23 22:43:22 cannot open
2025/02/23 22:43:23 cannot open
unexpected EOF
unexpected EOF
2025/02/23 22:43:25 dialing:dial-http unix /var/tmp/824-mr-0: unexpected EOF
2025/02/23 22:43:25 cannot open
2025/02/23 22:43:25 cannot open
--- crash test: PASS
*** PASSED ALL TESTS
|
Conclusion
Stateless MapReduce and Fault-tolerance
MapReduce is stateless in functions. This means workers can do the tasks independently and failed workers will not cause failure in the program.
State Machine
We are supposed to save the states, change the states, distinguish different states, and do the correct things in the corresponding state.
Program == Logic + Control + Data Structure
The objects (structs) designed are the data structure.
The state machine is the logic.
The code implementation is the control.