mit6.824 系列学习
首先贴一下课程官网,方便大家查阅.
mit6.824
lab1实现: 首先 在 这里 看lab1的要求,我们先执行几个官网给出的命令搭建实验环境
1 2 3 4 5 $ git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824 $ cd 6.824 $ ls Makefile src $
这样就能直接拉取到所需的代码
大致目录如下:
然后继续看官网:
We supply you with a simple sequential mapreduce implementation in src/main/mrsequential.go. It runs the maps and reduces one at a time, in a single process. We also provide you with a couple of MapReduce applications: word-count in mrapps/wc.go, and a text indexer in mrapps/indexer.go. You can run word count sequentially as follows:
大致是提供一个MapReduce application demo 你可以照着run一下.
1 2 3 4 5 6 7 8 9 10 $ cd ~/6.824 $ cd src/main $ go build -buildmode=plugin ../mrapps/wc.go $ rm mr-out* $ go run mrsequential.go wc.so pg*.txt $ more mr-out-0 A 509 ABOUT 2 ACT 8 ...
于是我在goland试了一下:
报错贴了Google:原来是Windows环境做不了这个lab啊
稍加思索—>发现异常–>放弃实验跑去linux继续苟
然后就会输出它给你的文本的单词统计(这也是我做的第一个hadoop框架有用的demo.hadoop内核代码也是mapreduce.
clone出来的仓库中只有一个文件夹是和第一个lab mapreduce相关的
mr文件夹!芜湖
关于mapreduce系统框架: MapReduce系统是由一个master进程和多个worker进程组成,master和worker之间是通过RPC(Remote Procedure Call)进行通信,master进程负责给多个worker分配任务,记录任务完成状态,并且需要处理worker奔溃或者超时运行等问题,worker需要处理相应的任务,处理完毕发送报告给master,再请求下一个任务。
master结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type Flag struct { processing bool finished bool } type Master struct { FileNames []string MapFlags []Flag ReduceFlags []Flag MapTaskCnts []int ReduceTaskCnts []int MapAllDone bool ReduceALLDone bool MapNum int ReduceNum int Mut sync.Mutex }
FileNames : pg*.txt这八个文件名
MapFlags :对应的状态
ReduceFlags:同状态
MapTaskCnts:记录map当前的任务序列号,如果map任务发生timeout,HandleTimeout这个函数对map任务进行的processing标志清0,重新分配,当前任务序列号在上个任务号中加1,如果之前发生timeout的任务来报告完成,由于小于当前任务号,HandleWorkerReport函数可以无需记录,直接退出.
ReduceTaskcnts:同上
MapAllDone:任务全部完成,变成true
ReduceAllDone:Reduce任务全部完成为true
MapNum:Map任务数
ReduceNum:任务数
Mut:排它锁
Worker结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 type TaskState int const ( MapState TaskState = 0 ReduceState TaskState = 1 StopState TaskState = 2 WaitState TaskState = 3 ) type WorkerTask struct { MapID int ReduceID int ReduceNum int MapNum int MapTaskCnt int ReduceTaskCnt int State TaskState FileName string MapFunction func (string , string ) []KeyValue ReduceFunction func (string , []string ) string }
MapID和ReduceID:Map任务ID和Reduce任务ID
MapNum和ReduceNum:Map的任务总数和Reduce任务总数
MapTaskCnt和ReduceTaskCnt:Map任务序列号和Reduce序列号 State:任务有四种状态,分别是MapState,ReduceState,StopState和WaitState,MapState表示当前需要处理Map任务,ReduceState表示当前需要处理Reduce任务,WaitState表示当前没有需要处理的任务,开始睡眠等待,StopState代表任务已全部完成,可以退出。
FileName:表示Map任务需要的文件名
MapFunction和ReduceFunction:任务根据State需要进行的Map函数或者Reduce函数
Master接口:
创建Master:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func MakeMaster (files []string , nReduce int ) *Master { m := Master{FileNames: files, MapFlags: make ([]Flag, len (files), len (files)), ReduceFlags: make ([]Flag, nReduce, nReduce), MapNum: len (files), ReduceNum: nReduce, MapAllDone: false , ReduceALLDone: false , MapTaskCnts: make ([]int , len (files)), ReduceTaskCnts: make ([]int , nReduce), } m.server() args, reply := NoArgs{}, NoReply{} go m.HandleTimeOut(&args, &reply) return &m }
生成worker task:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 func (m *Master) CreateWorkerTask (args *NoArgs, workerTask *WorkerTask) error { m.Mut.Lock() defer m.Mut.Unlock() if !m.MapAllDone { for idx := 0 ; idx < m.MapNum; idx++ { if !m.MapFlags[idx].processing && !m.MapFlags[idx].finished { workerTask.ReduceNum = m.ReduceNum workerTask.MapNum = m.MapNum workerTask.State = MapState workerTask.MapID = idx workerTask.FileName = m.FileNames[idx] m.MapTaskCnts[idx]++ workerTask.MapTaskCnt = m.MapTaskCnts[idx] m.MapFlags[idx].processing = true return nil } } workerTask.State = WaitState return nil } if !m.ReduceALLDone { for idx := 0 ; idx < m.ReduceNum; idx++ { if !m.ReduceFlags[idx].processing && !m.ReduceFlags[idx].finished { workerTask.State = ReduceState workerTask.ReduceNum = m.ReduceNum workerTask.MapNum = m.MapNum workerTask.ReduceID = idx m.ReduceTaskCnts[idx]++ workerTask.ReduceTaskCnt = m.ReduceTaskCnts[idx] m.ReduceFlags[idx].processing = true return nil } } workerTask.State = WaitState return nil } workerTask.State = StopState return nil }
函数首先会获得互斥锁,然后判断MapAllDone是否为false,为false进入循环遍历,如果某个任务的processing状态和finished状态都为false,说明这个任务可以需要被处理,可以分配,讲配置参数写入到输出参数中,并标志master中当前任务的状态processing为true以及序列号。如果没有任务需要处理,说明map有些任务正在处理,有些已完成。进入等待阶段。判断ReduceALLDone与前面类似。不加以叙述。
处理worker report
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 func (m *Master) HandleWorkerReport (wr *WorkerReportArgs, task *NoReply) error { m.Mut.Lock() defer m.Mut.Unlock() if wr.IsSuccess { if wr.State == MapState { if wr.MapTaskCnt == m.MapTaskCnts[wr.MapID] { m.MapFlags[wr.MapID].finished = true m.MapFlags[wr.MapID].processing = false } } else { if wr.ReduceTaskCnt == m.ReduceTaskCnts[wr.ReduceID] { m.ReduceFlags[wr.ReduceID].finished = true m.ReduceFlags[wr.ReduceID].processing = false } } } else { if wr.State == MapState { if m.MapFlags[wr.MapID].finished == false { m.MapFlags[wr.MapID].processing = false } } else { if m.ReduceFlags[wr.ReduceID].finished == false { m.ReduceFlags[wr.ReduceID].processing = false } } } for id := 0 ; id < m.MapNum; id++ { if !m.MapFlags[id].finished { break } else { if id == m.MapNum-1 { m.MapAllDone = true } } } for id := 0 ; id < m.ReduceNum; id++ { if !m.ReduceFlags[id].finished { break } else { if id == m.ReduceNum-1 { m.ReduceALLDone = true } } } return nil }
输入参数有一个标示位,表示任务是否成功,成功判断任务状态以及序列号,如果序列号与master对应上,可以表明这个任务成功,如果对不上,说明这个是个timeout任务,无需处理.如果任务标志位为false,进入错误处理,判断任务是否完成,因为可能是timeout任务标志位为false,未完成让processing置0,CreateWorkerTask可以重新分配。最后判断Map任务和Reduce任务是否相应全部完成,全部完成可以设置MapALLDone和ReduceALLDone为true。
处理timeout:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (m *Master) HandleTimeOut (args *NoArgs, reply *NoReply) error { for { m.Mut.Lock() if m.MapAllDone && m.ReduceALLDone { m.Mut.Unlock() break } time.Sleep(30 * time.Millisecond) if !m.MapAllDone { for idx := 0 ; idx < m.MapNum; idx++ { if m.MapFlags[idx].finished == false { m.MapFlags[idx].processing = false } } } else { for idx := 0 ; idx < m.ReduceNum; idx++ { if m.ReduceFlags[idx].finished == false { m.ReduceFlags[idx].processing = false } } } m.Mut.Unlock() time.Sleep(2000 * time.Millisecond) } return nil }
处理timeout很简单,先判断MapAllDone和ReduceAllDone是不是都为true,都为true都退出即可.然后判断m任务有无完成,没有完成任务的processing清为0,就可以让createWorkerTask重新分配没有完成的任务.最后释放锁,睡眠2S,可以看到Handletimeout函数是以2S为间隔,2s没有完成任务的视为timeout.
Work接口
生成Work
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func Worker (mapf func (string , string ) []KeyValue , reducef func (string , []string ) string ) { wt := WorkerTask{ MapFunction: mapf, ReduceFunction: reducef, } for { wt.GetWorkerTask() if wt.State == MapState { wt.DoMapWork() } else if wt.State == ReduceState { wt.DoReduceWork() } else if wt.State == StopState { break } else if wt.State == WaitState { time.Sleep(300 * time.Millisecond) } } return }
mrworker会调用worker函数,传入map函数和reduce函数,根据函数参数创建一个worker,然后进入循环,调用GetWorkerTask函数,这个函数会调用Master.CreateWorkerTask函数,并传入两个参数,得到任务分配后,讲相应的参数和状态赋值给worker。worker就可以根据状态进入处理相应任务或者睡眠,或者退出。
Map Work
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func (wt *WorkerTask) DoMapWork () { file, err := os.Open(wt.FileName) content, err := ioutil.ReadAll(file) file.Close() kvs := wt.MapFunction(wt.FileName, string (content)) intermediate := make ([][]KeyValue, wt.ReduceNum, wt.ReduceNum) for _, kv := range kvs { idx := ihash(kv.Key) % wt.ReduceNum intermediate[idx] = append (intermediate[idx], kv) } for idx := 0 ; idx < wt.ReduceNum; idx++ { intermediateFileName := fmt.Sprintf("mr-%d-%d" , wt.MapID, idx) file, err = os.Create(intermediateFileName) data, _ := json.Marshal(intermediate[idx]) _, err = file.Write(data) file.Close() } wt.ReportWorkerTask(nil ) } func (wt *WorkerTask) ReportWorkerTask (err error) { wra := WorkerReportArgs{ MapID: wt.MapID, ReduceID: wt.ReduceID, State: wt.State, IsSuccess: true , } if wt.State == MapState { wra.MapTaskCnt = wt.MapTaskCnt } else { wra.ReduceTaskCnt = wt.ReduceTaskCnt } wrr := NoReply{} if err != nil { wra.IsSuccess = false } call("Master.HandleWorkerReport" , &wra, &wrr) }
Map work就是读取相应的文件,调用MapFunction生成KeyValue对,然后根据哈希函数得到要讲当前key分配到哪一块中,总共有ReduceNum块,最后根据这么块生成对应map以及reduce块的文件。然后调用ReportWorkerTask报告成功,传入nil表示成功。ReportWorkerTask内部会调用Master.HandleWorkerReport函数来汇报这一执行结果。
Reduce Work
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (wt *WorkerTask) DoReduceWork () { kvsReduce := make (map [string ][]string ) for idx := 0 ; idx < wt.MapNum; idx++ { filename := fmt.Sprintf("mr-%d-%d" , idx, wt.ReduceID) file, err := os.Open(filename) content, err := ioutil.ReadAll(file) file.Close() kvs := make ([]KeyValue, 0 ) err = json.Unmarshal(content, &kvs) for _, kv := range kvs { _, ok := kvsReduce[kv.Key] if !ok { kvsReduce[kv.Key] = make ([]string , 0 ) } kvsReduce[kv.Key] = append (kvsReduce[kv.Key], kv.Value) } } ReduceResult := make ([]string , 0 ) for key, val := range kvsReduce { ReduceResult = append (ReduceResult, fmt.Sprintf("%v %v\n" , key, wt.ReduceFunction(key, val))) } outFileName := fmt.Sprintf("mr-out-%d" , wt.ReduceID) err := ioutil.WriteFile(outFileName, []byte (strings.Join(ReduceResult, "" )), 0644 ) wt.ReportWorkerTask(nil ) }
这里首先读取相同块的所有文件,需要对相同key的内容聚合在一起,然后循环调用ReduceFunction得到Reduce的结果,最后生成输出.
END: 到这里MapReduce实现的就差不多了,关于MapReduce,总结下来是:map对每个文件生成单词和单一数目,分在不同的区块保存,Reduce对不同区块进行统计,得到最终结果.讲这两个过程直接包装起来就是mapreduce.
关于MapReduce的论文,可以阅读这里 .
当然由于是04年的论文,所以现在的翻译资源已经很丰富了(正经人谁去读原版那种单词都认识合成一句话就不知道讲什么的东西呢.
最后放一张过test图片:
—MIT6.824 lab1 end
———–2020.10.11
@copyright ————baijianruoliorz@Github——————————–