课程要求是不公开自己的代码的,遵守一下规则。这里简单讲讲思路和遇到的问题
--- wc test: PASS
*** Starting indexer test.
--- indexer test: PASS
*** Starting map parallelism test.
--- map parallelism test: PASS
*** Starting reduce parallelism test.
--- reduce parallelism test: PASS
*** Starting crash test.
--- crash test: PASS
*** PASSED ALL TESTS
实现
首先思考一下“要做什么”,所以做了一下功能点的拆分
1. master收到任务(文件名)后负责拆分任务
2. worker向master申请任务(可能是map也可能是reduce)
3. worker map操作,存入中间文件
4. worker reduce操作,读取中间文件,写入文件
5. crash worker 的处理
6. master判断任务是否已经全部完成;worker结束进程
master是有状态的,设计如下:
type Master struct {
// Your definitions here.
files []string //待处理的文件
mapfTasks map[int]int //key为TaskId,value为task状态0 not start 1 in progress 2 success
mapfAllSuccess bool // mapre任务是否都完成
nReduce int // reduce任务的数量
reducefTasks map[int]int // key为TaskId,value为task状态0 not start 1 in progress 2 success
reducefTaskFileMap map[int][]string // 一个kv表示一个 reduceId 所需要处理所有文件
done bool // 是否完成
mu sync.Mutex // 锁
}
我的实现用了两个rpc:
func (m *Master) AskForTask(args struct{}, reply *AskForTaskReply) error
func (m *Master) ReportTask(args ReportTaskArgs, replay *struct{}) error
方法AskForTask
就是一个worker
向master
申请任务的过程,对应功能点1、2和6。
功能点1和2好理解。这主要是根据mapfTasks
和reducefTasks
这两个任务状态表分发,这两个表key是待处理map任务的id,value是任务状态(未开始、进行中、已完成)。
for taskId, status := range m.mapfTasks {
if status == 0 {
// 分发任务
}
}
为什么会有6是因为考虑到worker
持续循环调用AskForTask
,所以把判断任务全部完成的状态也加在这个方法里,AskForTaskReply
会告知worker
任务全部完成。
if successCnt == len(m.reducefTasks) {
m.done = true
}
功能点3、4在worker.go
下实现。map
操作的核心思想是读取待处理的文件,调用mapf
,写入intermedia file
,通过rpc方法ReportTask
告知master
任务已完成。。reduce
操作的核心思想是根据master指定的intermedia filename
去读取中间文件,然后调用reducef
,然后通知任务完成。
我这样实现的话,有个关键的点是,map在通知任务完成时,要把中间文件的filename也告诉master,因为这个是reduce任务的来源。
任务5我的做法比较简单,在每次分发任务时都新建一个线程延时等待,go m.waitForMapfSuccess(taskId)
。如果等待时间结束,任务还未完成,就把任务从进行中改为未开始。这个做法还是比较粗糙的,不过在这个lab里能够处理。
func (m *Master) waitForMapfSuccess(taskId int) {
time.Sleep(10 * time.Second)
m.mu.Lock()
defer m.mu.Unlock()
if m.mapfTasks[taskId] == 1 {
//log.Printf("mapf taskid %v 超时", taskId)
m.mapfTasks[taskId] = 0
}
}
问题点
环境
我在wsl2环境下实现,而golang版本不是Go1.13
,而是Go1.18.2
。这里还是不建议更改版本,不过因为module相关的一些问题,哪怕我用了go1.13也会触发同样的问题,所以最后我改了下测试文件的编译命令,直接用1.18去跑(逃
...
(cd .. && GO111MODULE=off go build $RACE mrmaster.go) || exit 1
(cd .. && GO111MODULE=off go build $RACE mrworker.go) || exit 1
(cd .. && GO111MODULE=off go build $RACE mrsequential.go) || exit 1
...
这个改动应该是不影响在要求的环境下的测试结果的
race
加-race被提醒有地方会有问题
WARNING: DATA RACE
Write at 0x00c000100220 by goroutine 78:
_/home/...../MIT6.824/src/mr.(*Master).AskForTask()
/home/...../MIT6.824/src/mr/master.go:75 +0x885
Previous read at 0x00c000100220 by main goroutine:
_/home/.....g/MIT6.824/src/mr.(*Master).Done()
/home/...../MIT6.824/src/mr/master.go:134 +0xef
parallelism测试卡死 只能手动Kill
这个问题一开始很困惑,后来发现单跑这个测试是能通过的。然后我发现这个测试会读当前文件夹。而我一开始没有手动删除intermedia file,这个test也不会删之前的intermedia file,我猜测问题点出在这里。事实证明在加入了任务完成后删除intermedia file后,就能通过了。
其他小bug
比如重复问题,debug发现是没有等map全部结束就发出去了reduce任务。
ADLER 1
ADVENTURE 12
ADVENTURES 7
AFTER 2
AGREE 16
AGREEMENT 8
...
ADLER 1
ADVENTURE 12
ADVENTURES 7
AFTER 2
AFTER 2
AGREE 16
AGREEMENT 8
还有crash test超时问题,打了日志后再稍微看下代码就发现是小bug,不多赘述。