MIT 6.824 Lab1 MapReduce

MapReduce code implementation in Golang

Project Overview

We are building a MapReduce system.

We are implementing a worker process that calls application Map and Reduce functions and handles reading and writing files, and a master process that hands out tasks to workers and copes with failed workers.

What Already Have

  • mrmaster.go: the entrance of master program, call master later
  • mrworker.go: the entrance of worker program, call worker later
  • the plugin: contains Map function and Reduce function

Objects

use struct in go.

  • Master
  • Worker
  • rpc Args and Reply

We can define fields and methods later.

Process

In this distributed system, one master and at least one worker are running in parallel.

  1. In Master line, construct the master, then let master provide rpc service.
    In Worker line, the worker works continuously until everything is done.

  2. The worker ask master for a task through rpc.

  3. Master determines the stage of itself. Then traverse over the tasks, determine the status of the tasks, and send uncompleted or timeout task to worker.

  4. The worker receives the work, determines its type, hand it to the corresponding Map or Reduce function. After completion, send the result to Master.

  5. Master saves the result, shuffles and sorts.

  6. When all the tasks are done, master changes its stage, exits the program. Workers receive a Done signal after asking a task exit the program.

Highlights of The Code Implemention

Two(Multi) lines in parallel

One master and at least one worker are running in parallel. The entrances of them are mrmaster.go and mrworker.go. In main method in mrmaster.go, we invoke mr.MakeMaster method which is defined in master.go to construct the master. In main method in mrworker.go, load plugin first, then invoke mr.Worker method, which is defined in worker.go, to run the worker.

Where to Store the Status

In Master. The status are stored as type Master struct’s properties.
The status includes:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
	Files   []string
	NReduce int

	Stage                    int   // 0 -> Map, 1 -> Reduce, 2 -> Done
	MapStatus                []int // 0 -> waiting, 1 -> executing, 2 -> done
	MapStartTime             []time.Time
	CompletedMapTaskCount    int
	ReduceStatus             []int // 0 -> waiting, 1 -> executing, 2 -> done
	ReduceStartTime          []time.Time
	CompletedReduceTaskCount int

	ReduceWorks [][]string

Besides, need a lock:

1
	lock sync.Mutex

Worker works continuously

Use a while loop in Worker() method, until receives a Down flag and break.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	for {
        // ...

		switch task.Tpe {
        // ...

		case 2:
			// Done
			break

        // ...
		}
	}
}

The RPC

Note that we pass the pointer to the function.

Encapsulate to make it easier:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args *Args, reply *Reply) bool {
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := masterSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	}
	defer c.Close()

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}

How to determine the task type

Use a int flag in Reply struct to distinguish Map, Reduce, Done and Wait types of the task.

1
2
3
4
5
6
// This is the task.
type Reply struct {
    // ...
	Tpe int // 0 -> Map, 1 -> Reduce, 2 -> Done, 3 -> Wait
    // ...
}

Work Distribution Implementation

As said before, master holds the status. Master saved the status and the start time of the task in slices. Note that the indexes of them are correspond. When receiving a AskForWork call, master traverse all the tasks(Map or Reduce), judge the status of it. If the status is done, just continue; if the status is waiting, hand it to the worker; if the status is executing, judge whether it is timeout.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// some code omitted
func (m *Master) AskForWork(args *Args, task *Reply) error {
	m.lock.Lock()
	defer m.lock.Unlock()

	switch m.Stage {
	case 0:
		// Map
		for i, status := range m.MapStatus {
			if status == 0 {
			} else if status == 1 {
			} else if status == 2 {
			}
		}
		// wait
		task.Tpe = 3
	case 1:
	case 2:
	}

	return nil
}

Fault-tolerance Design

Master also saves start-executing time of each task. When traversing in AskForWork method, judge whether a task is timeout. If yes, hand it to the current worker.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
			} else if status == 1 {
				if time.Now().After(m.MapStartTime[i].Add(10 * time.Second)) {
					task.Id = i
					task.Tpe = 0
					task.Filename = m.Files[i]
					m.MapStatus[i] = 1
					m.MapStartTime[i] = time.Now()
					return nil
				} else {
					continue
				}

One possible sitution:

If a worker fails to communicate with the master, the work will be send to another worker. In this condition two workers are executing the same work at the same time, and they will both send the result to master after completion.

This is because the master cannot ask the problematic worker to stop executing (they cannot communicate with each other).

So we will handle this when receiving results.

In FinishXxx method, We use double-check lock to make sure the same result will not be saved twice.

1
2
3
4
5
6
7
8
9
func (m *Master) FinishMap(args *Args, reply *Reply) error {
	if m.MapStatus[args.Id] == 1 {
		m.lock.Lock()
		if m.MapStatus[args.Id] == 1 {
            // ...

        }
    }
}

What Does Worker Do When Receiving a Task

Map task:
Get the content of the file through filename, then invoke mapf to execute the real Map. At last use rpc to send the result.

1
2
3
4
5
6
7
		case 0:
			// Map
			content := getContentThroughFilename(task.Filename)
			kvs := mapf(task.Filename, content)
			args := Args{task.Id, kvs}
			reply := Reply{}
			call("Master.FinishMap", &args, &reply)

Reduce task:
Traverse the filenames, put each key-value of each file to a HashMap. After this done, traverse the HashMap, invoke reducef to each key-value. At last use rpc to send the result.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
		case 1:
			// Reduce
			filenames := task.Filenames
			hashMap := make(map[string][]string)
			for _, filename := range filenames {
				file, err := os.Open(filename)
				if err != nil {
					log.Fatalf("Cannot open %v", filename)
				}
				scanner := bufio.NewScanner(file)
				for scanner.Scan() {
					line := scanner.Text()
					kv := strings.Split(line, " ")
					key := kv[0]
					value := kv[1]
					hashMap[key] = append(hashMap[key], value)
				}
			}
			var kvs []KeyValue
			for key, parts := range hashMap {
				value := reducef(key, parts)
				kv := KeyValue{key, value}
				kvs = append(kvs, kv)
			}
			args := Args{task.Id, kvs}
			reply := Reply{}
			call("Master.FinishReduce", &args, &reply)

What Does Master Do When Receiving a Result

Map task:
Save the result to a intermediate file. Save the task in m.ReduceWorks[y].
The hash operation is hash(key) % nReduce.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
		if m.MapStatus[args.Id] == 1 {
			// store to "mr-x-y" file
			groups := make(map[int][]KeyValue)
			for _, kv := range args.Kvs {
				y := ihash(kv.Key) % m.NReduce
				groups[y] = append(groups[y], kv)
			}

			for y, kvs := range groups {
				filename := "mr-" + strconv.Itoa(args.Id) + "-" + strconv.Itoa(y)
				// about the first param, can't use "" while using Silverblue
				tempFile, err := ioutil.TempFile(".", "temp-")
				if err != nil {
					return err
				}

				for _, kv := range kvs {
					tempFile.WriteString(kv.Key + " " + kv.Value + "\n")
				}
				tempFile.Close()

				os.Rename(tempFile.Name(), filename)
				m.ReduceWorks[y] = append(m.ReduceWorks[y], filename)
			}

			// change the status of the work
			m.MapStatus[args.Id] = 2
			m.CompletedMapTaskCount++

			// change the stage of the master
			if m.CompletedMapTaskCount == len(m.Files) {
				m.Stage = 1
			}

Reduce task:
Received []KeyValue, use a HashMap to sort by key. At last write to the final file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
		if m.ReduceStatus[args.Id] == 1 {
			hashMap := make(map[string]string)
			var arrayList []string
			for _, kv := range args.Kvs {
				hashMap[kv.Key] = kv.Value
				arrayList = append(arrayList, kv.Key)
			}
			sort.Strings(arrayList)

			filename := "mr-out-" + strconv.Itoa(args.Id)
			file, err := os.Create(filename)
			if err != nil {
				return err
			}
			defer file.Close()
			for _, key := range arrayList {
				fmt.Fprintf(file, "%v %v\n", key, hashMap[key])
			}

			// change the status of the work
			m.ReduceStatus[args.Id] = 2
			m.CompletedReduceTaskCount++

			// change the stage of Master
			if m.CompletedReduceTaskCount == m.NReduce {
				m.Stage = 2
			}
		}

Test

Passed all tests:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
$ sudo sh test-mr.sh
*** Starting wc test.
read unix @->/var/tmp/824-mr-0: read: connection reset by peer
2025/02/23 22:42:29 dialing:dial-http unix /var/tmp/824-mr-0: unexpected EOF
2025/02/23 22:42:29 cannot open 
2025/02/23 22:42:29 dialing:dial-http unix /var/tmp/824-mr-0: read unix @->/var/tmp/824-mr-0: read: connection reset by peer
--- wc test: PASS
*** Starting indexer test.
2025/02/23 22:42:32 dialing:dial-http unix /var/tmp/824-mr-0: read unix @->/var/tmp/824-mr-0: read: connection reset by peer
2025/02/23 22:42:32 dialing:dial-http unix /var/tmp/824-mr-0: read unix @->/var/tmp/824-mr-0: read: connection reset by peer
--- indexer test: PASS
*** Starting map parallelism test.
2025/02/23 22:42:39 dialing:dial-http unix /var/tmp/824-mr-0: read unix @->/var/tmp/824-mr-0: read: connection reset by peer
unexpected EOF
2025/02/23 22:42:39 cannot open 
--- map parallelism test: PASS
*** Starting reduce parallelism test.
2025/02/23 22:42:45 cannot open 
--- reduce parallelism test: PASS
2025/02/23 22:42:48 dialing:dial-http unix /var/tmp/824-mr-0: unexpected EOF
*** Starting crash test.
2025/02/23 22:43:13 cannot open 
2025/02/23 22:43:14 cannot open 
2025/02/23 22:43:14 cannot open 
2025/02/23 22:43:15 cannot open 
2025/02/23 22:43:15 cannot open 
2025/02/23 22:43:16 cannot open 
2025/02/23 22:43:16 cannot open 
2025/02/23 22:43:16 cannot open 
2025/02/23 22:43:17 cannot open 
2025/02/23 22:43:17 cannot open 
2025/02/23 22:43:17 cannot open 
2025/02/23 22:43:18 cannot open 
2025/02/23 22:43:18 cannot open 
2025/02/23 22:43:18 cannot open 
2025/02/23 22:43:19 cannot open 
2025/02/23 22:43:19 cannot open 
2025/02/23 22:43:19 cannot open 
2025/02/23 22:43:20 cannot open 
2025/02/23 22:43:20 cannot open 
2025/02/23 22:43:20 cannot open 
2025/02/23 22:43:21 cannot open 
2025/02/23 22:43:21 cannot open 
2025/02/23 22:43:21 cannot open 
2025/02/23 22:43:22 cannot open 
2025/02/23 22:43:22 cannot open 
2025/02/23 22:43:23 cannot open 
unexpected EOF
unexpected EOF
2025/02/23 22:43:25 dialing:dial-http unix /var/tmp/824-mr-0: unexpected EOF
2025/02/23 22:43:25 cannot open 
2025/02/23 22:43:25 cannot open 
--- crash test: PASS
*** PASSED ALL TESTS

Conclusion

Stateless MapReduce and Fault-tolerance

MapReduce is stateless in functions. This means workers can do the tasks independently and failed workers will not cause failure in the program.

State Machine

We are supposed to save the states, change the states, distinguish different states, and do the correct things in the corresponding state.

Program == Logic + Control + Data Structure

The objects (structs) designed are the data structure.
The state machine is the logic.
The code implementation is the control.

Licensed under CC BY-NC-SA 4.0
Who comes from mountains, rivers, lakes and seas, yet is confined to days, nights, kitchen and love?