http://www.web008.net

美高梅163888:分布式系统

前言

Mit6.824 是本身在上学一些分布式系统方面包车型地铁文化的时候一时看到的,然后就初叶尝试跟课。不能不说,国外的科目难度是真的大,七日的时间依然要学一门 Go 语言,然后还要读诗歌,进而做MapReduce 实验。
鉴于 MPAJERO(MapReduce卡塔尔国 框架须求创立在 DFS(Distributed File System卡塔 尔(阿拉伯语:قطر‎的底蕴上得以完成,所以本实验是由此选取多线程来效仿布满式景况。纵然难度上海南大学学大减少,但是透过该实验,照旧会让大家对 M安德拉 的基本原理有一个相比深厚的认知。
做试验从前大家需求先把卓绝的 MapReduce 杂谈给看了,窝相比建议直接看日文原稿,但要是时间不丰裕的话,能够一直在网络找粤语的翻译版。
刚初叶做那个试验的时候确实是没头没脑,完全不通晓什么出手。后来发掘这么些工程有一个自动化测验文件(test_test.go),每部分实验都会动用那几个测验文件里的函数对代码进行测量试验。大家只要本着那几个测量检验函数稳步倒推,然后补全代码就能够。

介绍

通过 遍布式系统连串小说,大家询问了布满式的有的基本概念,要是写点代码试行一下,那就更加好了。先做个简易的试验练练手,还记得 MapReduce 吗?,此次实验中会塑造一个 MapReduce 库,首要能熟知 Go 语言外加驾驭遍布式系统中的容错机制。首先写个一个简短的 MapReduce 程序,再写二个 Master,它不仅能分配义务给 worker 何况能管理 worker 奉行错误。接口参谋故事集描述。

Part I: Map/Reduce input and output

第一片段是先完成三个顺序版(sequential卡塔 尔(阿拉伯语:قطر‎的M帕杰罗,让大家对 M翼虎的流水生产线有三个大致的认知,并且实现doMap()doReduce() 四个函数。
其含有三个测量试验函数TestSequentialSingle()TestSequentialMany()

试行碰到

不会让您从零开端撸代码啦,还相当的慢 git clone ?

$ git clone git://g.csail.mit.edu/6.824-golabs-2016 6.824
$ cd 6.824
$ ls
Makefile src

MapReduce 代码支持顺序实施和布满式实施。顺序奉行代表 Map 先推行,当有着 Map 职分都成功了再奉行Reduce,这种格局恐怕功效相当的低,不过正如便于调节和测量检验,毕竟串行。分布式实践运维了比超级多worker 线程,他们并行实践 Map 职分,然后施行 Reduce 任务,这种情势效用更加高,当然更难完成和调弄整理。

TestSequentialSingle()

种种map worker管理一个文本,所以map worker的数额就十分文件的数额。
测量试验单个map worker 和 reduce worker。

func TestSequentialSingle(t *testing.T) {
    mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc)
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)
    cleanup(mr)
}

有备无患:熟识代码

mapreduce 包提供了二个回顾的 MapReduce 顺序实行落到实处。应用只要调用 Distributed() 方法就能够运营三个职务,不过要调解的时候恐怕需求调用 Sequential().

mapreduce 的周转流程如下:

  1. 应用层供给提供输入文件,多少个 map 函数,一个 reduce 函数,要运行reduce 任务的数码。

  2. 用那些参数创立三个 master。它会运营贰个 RPC 服务器(master_rpc.go),然后等待 worker 注册(Register())。当有待产生的任务时,schedule() 就能够将职责分配给 worker,同期也会实行 worker 的错误管理。

  3. master 以为每种输入文件应当交付八个 map 任务管理,然后调用 doMap(),无论直接调用 Sequential() 依然经过 RPC 给 worker 发送 DoTask 新闻都会接触这些操作。每当调用 doMap() 时,它都会去读取相应的文书,以文件内容调用 map 函数並且为各类输入文件发出 nReduce 个公文。因此,每一个 map 职分最后会发生 #files x nReduce 个文件。

  4. master 接下来会对各个 reduce 职责起码调用二遍 doReduce()doReduce() 首先会采撷 nReduce 个 map 职分发生的文件,然后在各样文件上实施 reduce 函数,最后产生多少个结果文件。

  5. master 会调用 mr.merge() 方法将上一步发生负有结果文件聚合到二个文本中。

进而本次实验正是到填空题,空是:doMap, doReduce,schedule 和 reduce。

其他的不二等秘书诀基本不供给改造,有时间的钻研研究有扶助明白全部架构。

TestSequentialMany()

此测量检验函数测量试验多个 map worker 和多少个 reduce worker。
其运作逻辑和TestSequentialSingle类似。

func TestSequentialMany(t *testing.T) {
    mr := Sequential("test", makeInputs(5), 3, MapFunc, ReduceFunc)
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)
    cleanup(mr)
}

Part I: Map/Reduce 输入和出口

率先个空 doMap() 函数的职能是读取钦命文件的内容,实践 mapF 函数,将结果保存在新的文书中;而 doReuce() 读取 doMap 的输出文件,实施 reduceF 函数,将结果存在磁盘中。

写完了就测量检验测量试验,测量试验文件(test_test.go)已经写好了。串行格局测量检验可执行:

$ cd 6.824
$ export "GOPATH=$PWD"  
$ cd "$GOPATH/src/mapreduce"
$ setup ggo_v1.5
$ go test -run Sequential mapreduce/...
ok      mapreduce   2.694s

设若你看看的不是 ok,表明还应该有 bug 哦。在 common.go 将 debugEnbale 设置成 true,然后运营 go test -run Sequential mapreduce/... -v,能够观察更详尽的出口:

$ env "GOPATH=$PWD/../../" go test -v -run Sequential mapreduce/...
=== RUN   TestSequentialSingle
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
master: Map/Reduce task completed
--- PASS: TestSequentialSingle (1.34s)
=== RUN   TestSequentialMany
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
master: Map/Reduce task completed
--- PASS: TestSequentialMany (1.33s)
PASS
ok      mapreduce   2.672s

Sequential()

测验函数将工作名称,测试文件,reduce 的数量,用户定义的 map 函数,reduce 函数三个实参传递给Sequential()

// Sequential runs map and reduce tasks sequentially, waiting for each task to
// complete before running the next.
func Sequential(jobName string, files []string, nreduce int,
    mapF func(string, string) []KeyValue,
    reduceF func(string, []string) string,
) (mr *Master) {
    mr = newMaster("master")
    go mr.run(jobName, files, nreduce, func(phase jobPhase) {
        switch phase {
        case mapPhase:
            for i, f := range mr.files {
                doMap(mr.jobName, i, f, mr.nReduce, mapF)
            }
        case reducePhase:
            for i := 0; i < mr.nReduce; i++ {
                doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
            }
        }
    }, func() {
        mr.stats = []int{len(files) + nreduce}
    })
    return
}

Sequential()首先获得多个Master 对象的指针,然后接纳函数闭包运转Master.run()

Part II: 单机词频计算

完了了第黄金时代有个别,大家能够起来营造和睦第五个 MapReduce 系统:词频总结器。对的仍然填空题:mapF 和 reduceF,让 wc.go 能够总括出各种单词现身的次数。大家的测量检验文件之中只有俄语,所以三个单词便是连接出现字母,判定三个假名参谋标准库 unicode.IsLetter

测验文件是 6.824/src/main/pg-*.txt,无妨先编写翻译试试:

$ cd 6.824
$ export "GOPATH=$PWD"
$ cd "$GOPATH/src/main"
$ go run wc.go master sequential pg-*.txt
# command-line-arguments
./wc.go:14: missing return at end of function
./wc.go:21: missing return at end of function

自然通过不停,究竟空尚未填呢。mapF 的参数是测量检验文件名和其内容,分割成单词,重返 []mapreduce.KeyValue,KeyValue:单词-频次。轮到 reduceF 函数了,它会针对各种 key(单词) 调用二遍,参数是有个别单词以致那些单词在具备测验文件中的 mapF 结果。

写好了,便可测验:

$ cd "$GOPATH/src/main"
$ time go run wc.go master sequential pg-*.txt
master: Starting Map/Reduce task wcseq
Merge: read mrtmp.wcseq-res-0
Merge: read mrtmp.wcseq-res-1
Merge: read mrtmp.wcseq-res-2
master: Map/Reduce task completed
14.59user 3.78system 0:14.81elapsed

最后的结果保存在 mrtmp.wcseq 文件中。运转 $ rm mrtmp.* 删除全体的中等数据文件。

运行 sort -n -k2 mrtmp.wcseq | tail -10,假如看见的和下边包车型客车大同小异,表达你写对了。

$ 
he: 34077
was: 37044
that: 37495
I: 44502
in: 46092
a: 60558
to: 74357
of: 79727
and: 93990
the: 154024

能够直接运转 $sh ./test-wc.sh

小提示: strings.FieldFunc 能够将三个 string 分割成多少个部分,strconv 包中有函数可将 string 转换到 int。

Master.run()

// run executes a mapreduce job on the given number of mappers and reducers.
//
// First, it divides up the input file among the given number of mappers, and
// schedules each task on workers as they become available. Each map task bins
// its output in a number of bins equal to the given number of reduce tasks.
// Once all the mappers have finished, workers are assigned reduce tasks.
//
// When all tasks have been completed, the reducer outputs are merged,
// statistics are collected, and the master is shut down.
//
// Note that this implementation assumes a shared file system.
func (mr *Master) run(jobName string, files []string, nreduce int,
    schedule func(phase jobPhase),
    finish func(),
) {
    mr.jobName = jobName
    mr.files = files
    mr.nReduce = nreduce

    fmt.Printf("%s: Starting Map/Reduce task %sn", mr.address, mr.jobName)

    schedule(mapPhase)
    schedule(reducePhase)
    finish()
    mr.merge()

    fmt.Printf("%s: Map/Reduce task completedn", mr.address)

    mr.doneChannel <- true
}

Part III: 分布式 MapReduce

MapReduce 让开拓者最爽的地点是无需关心代码是在多台机械并行实行的。但大家后日的落到实处是 master 把 map 和 reduce 职责多少个叁个施行。固然这种完成情势概念上超级轻巧,不过品质并非非常高。接下来大家来兑现两个冒出的 MapReduce,它会调用多少个 worker 线程去实行任务,那样可以更加好地选用多核CPU。当然大家的施行不是真署在多台机器上而是用 channel 去模拟遍布式总结。

由于是出现,所以须求调治者 master 线程,它负担给 worker 分发职责,并且一向守候直到全部 worker 达成职责。为了让咱们的实施特别实事求是,master 只可以通过 RPC 的措施与 worker 通讯。worker 代码(mapreduce/worker.go)已经准备好了,它用来运行 worker。

下多个空是 schedule.go 中的 schedule(),那一个点子担当给 worker 分发 map 和 reduce 职务,当全数任务到位后回到。

master.go 中的 run() 方法会先调用 schedule(),然后调用 merge() 把各样 reduce 任务的输出文件整合到三个文件之中。schedule 只须要报告 worker 输入文件的名字 (mr.files[task]) 和职分 task,worker 本身理解从哪个地方读取也清楚把结果写到哪个文件之中。master 通过 RPC 调用 Worker.DoTask 布告 worker 起头新任务,同一时间还有或许会在 RPC 参数中蕴涵三个 DoTaskArgs 对象。

当多少个 worker 计划达成能够干活时,它会向 master 发送多个 Register RPC,注册的同有的时候常间还大概会把这些 worker 的连锁音信放入 mr.registerChannel。所以 schedule 应该通过读取这些 channel 管理新 worker 的登记。

一时一刻正值运维的 job 消息都在 Master 中定义。注意,master 无需精晓 Map 或 Reduce 具体履行的是怎么着代码;当一个 worker 被 wc.go 创设时就曾经教导了 Map 和 Reduce 函数的音信。

运行 $ go test -run TestBasic mapreduce/... 可开展底工测验。

小提醒: master 应该相互的出殡 RPC 给 worker,这样 worker 能够并发推行职务。可参照 Go RPC 文书档案。

小提醒: master 应该等多个 worker 达成当前职务后当即为它分配两个新职分。等待 master 响应的线程能够用 channel 作为协作工具。Concurrency in Go 有详实的 channel 用法。

小提示: 追踪 bug 最简易的章程就是在代码参预 debug(),然后实行 go test -run TestBasic mapreduce/... > out,out 就能够蕴藏调节和测验消息。最要紧的思忖你原以为的输出和实在的出口为啥非常小器晚成致。

注:当前的代码试运作在一个 Unix 进度中,何况它亦可利用意气风发台机械的多核。假使是要布署在多台机器上,则要更正代码让 worker 通过 TCP 并非 Unix-domain sockets 通讯。此外还索要三个网络文件系统共享存款和储蓄。

doMap()

doMap()doReduce()是须要大家去落到实处的函数。
doMap()的落到实处着重是将客商定义的MapFunc()切割的文本,通过 hash 分到 'nReduce'个切条中去。

func doMap(
    jobName string, // the name of the MapReduce job
    mapTaskNumber int, // which map task this is
    inFile string,
    nReduce int, // the number of reduce task that will be run ("R" in the paper)
    mapF func(file string, contents string) []KeyValue,
) {
    // read contents from 'infile'
    dat,err := ioutil.ReadFile(inFile)
    if err != nil {
        log.Fatal("doMap: readFile ", err)
    }

    //transfer data into ‘kvSlice’ according to the mapF()
    kvSlice := mapF(inFile, string(dat))

    //divide the ‘kvSlice’ into 'reduceKv' according to the ihash()
    var reduceKv [][]KeyValue // temporary variable which will be written into reduce files
    for i:=0;i<nReduce;i++ {
        s1 := make([]KeyValue,0)
        reduceKv = append(reduceKv, s1)
    }
    for _,kv := range kvSlice{
        hash := ihash(kv.Key) % nReduce
        reduceKv[hash] = append(reduceKv[hash],kv)
    }

    //write 'reduceKv' into ‘nReduce’ JSON files
    for i := 0;i<nReduce;i++ {
        file,err := os.Create(reduceName(jobName,mapTaskNumber,i))
        if err != nil {
            log.Fatal("doMap: create ", err)
        }

        enc := json.NewEncoder(file)
        for _, kv := range reduceKv[i]{
            err := enc.Encode(&kv)
            if err != nil {
                log.Fatal("doMap: json encodem ", err)
            }
        }

        file.Close()

    }
}

Part IV: 管理 worker 施行错误

本小节要让你的 master 能够管理职分实践停业的 worker。由于 MapReduce 中 worker 并未持久状态,所以拍卖起来相对轻巧。假若三个 worker 实行倒闭了,master 向 worker 发送的其他三个 RPC 都恐怕倒闭,譬喻超时。由此,要是失败,master 应该把那一个职分指使给另为二个worker。

三个 RPC 战败并不一定代表 worker 战败,有一点都不小概率是有些 worker 平常运行但 master 不能够获取到它的音信。所以或者会出八个 worker 同偶尔间实行同四个职分。然而因为每一个职务都以幂等的,一个任务被实践三回是没啥影响。

咱俩只要它不会倒闭,所以无需管理 master 战败的意况。让 master 能够容错是周旋费劲的,因为它保持着同心同德的景况,当它失利后大家必要还原它的情事以确定保证它能够持续专业。

test_test.go 还剩最终四个测量检验。测有三个 worker 败北的场所和有为数不菲worker 退步的动静。运维可测验:$ go test -run Failure mapreduce/...

doReduce()

doReduce()尤为重尽管将 key 值相同的 value 打包发送给顾客定义的 ReduceFunc(),获得一个新的 kv对,key 值不改变,而value值则是ReduceFunc()的再次来到值,排序,最终将新的 kv对 切条写入文件。

type ByKey []KeyValue
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i],a[j] = a[j],a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

func doReduce(
    jobName string, // the name of the whole MapReduce job
    reduceTaskNumber int, // which reduce task this is
    outFile string, // write the output here
    nMap int, // the number of map tasks that were run ("M" in the paper)
    reduceF func(key string, values []string) string,
) {
    //read kv slice from the json file
    var kvSlice []KeyValue
    for i := 0;i<nMap;i++{
        //file, _ := os.OpenFile(reduceName(jobName,i,reduceTaskNumber), os.O_RDONLY, 0666)
        file,err := os.Open(reduceName(jobName,i,reduceTaskNumber))
        if err != nil {
            log.Fatal("doReduce: open ", err)
        }
        var kv KeyValue
        dec := json.NewDecoder(file)
        for{
            err := dec.Decode(&kv)
            kvSlice = append(kvSlice,kv)
            if err == io.EOF {
                break
            }
        }
        file.Close()
        /********/
        //此处如果用 defer,可能会造成文件开启过多,造成程序崩溃
        /********/
    }

    //sort the intermediate kv slices by key
    sort.Sort(ByKey(kvSlice))

    //process kv slices in the reduceF()
    var reduceFValue []string
    var outputKv []KeyValue
    var preKey string = kvSlice[0].Key
    for i,kv := range kvSlice{
        if i == (len(kvSlice) - 1) {
            reduceFValue = append(reduceFValue, kv.Value)
            outputKv = append(outputKv, KeyValue{preKey, reduceF(preKey, reduceFValue)})
        } else {
                if kv.Key != preKey {
                    outputKv = append(outputKv, KeyValue{preKey, reduceF(preKey, reduceFValue)})
                    reduceFValue = make([]string, 0)
                }
                reduceFValue = append(reduceFValue, kv.Value)
        }

        preKey = kv.Key
    }

    //write the reduce output as JSON encoded kv objects to the file named outFile
    file,err := os.Create(outFile)
    if err != nil {
        log.Fatal("doRuduce: create ", err)
    }
    defer file.Close()

    enc := json.NewEncoder(file)
    for _, kv := range outputKv{
        err := enc.Encode(&kv)
        if err != nil {
            log.Fatal("doRuduce: json encode ", err)
        }
    }
}

Part V: 反向索引(可选卡塔尔国

挑战性:

词频总结就算是 MapReduce 最卓绝的二个使用,可是在相近数据应用不平时用。试试写个反向索引应用。

反向索引在计算机科学中利用大范围,特别在文书档案寻觅世界中国和亚洲常常有效。平常的话,贰个反向索引即是叁个从数量到数量特征的投射。举例,在文书档案寻觅中,这几个映射或者就是尤为重要词与文档名称的映射。

main/ii.go 的完全布局跟 wc.go 相像。改正 mapF 和 reduceF 让它们创制反向索引。运行 ii.go 应该出口多少个元组列表,每风度翩翩行的格式如下:

$ go run ii.go master sequential pg-*.txt
$ head -n5 mrtmp.iiseq
A: 16 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
ABC: 2 pg-les_miserables.txt,pg-war_and_peace.txt
ABOUT: 2 pg-moby_dick.txt,pg-tom_sawyer.txt
ABRAHAM: 1 pg-dracula.txt
ABSOLUTE: 1 pg-les_miserables.txt

您的代码应该经过 test-ii.sh 的测量检验:

$ sort -k1,1 mrtmp.iiseq | sort -snk2,2 mrtmp.iiseq | grep -v '16' | tail -10
women: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
won: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
wonderful: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
words: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
worked: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
worse: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
wounded: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
yes: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
younger: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
yours: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt

Part II: Single-worker word count

其次有个别是兑现mapF()reduceF()函数,来促成通过各种MLX570总结词频的法力。
比较轻松,就直接放代码了。

func mapF(filename string, contents string) []mapreduce.KeyValue {
    f := func(c rune) bool {
        return !unicode.IsLetter(c)
    }
    var strSlice []string = strings.FieldsFunc(contents,f)
    var kvSlice []mapreduce.KeyValue
    for _,str := range strSlice {
        kvSlice = append(kvSlice, mapreduce.KeyValue{str, "1"})
    }

    return kvSlice
}

func reduceF(key string, values []string) string {
    var cnt int64
    for _,str := range values{
        temp,err := strconv.ParseInt(str,10,64)
        if(err != nil){
            fmt.Println("wc :parseint ",err)
        }
        cnt += temp
    }
    return strconv.FormatInt(cnt,10)
}

因而全方位测量检验

运维 src/main/test-mr.sh 可测量检验本次试验的全数内容。假若全勤由此,能够观察:

$ sh ./test-mr.sh
==> Part I
ok      mapreduce   3.053s

==> Part II
Passed test

==> Part III
ok      mapreduce   1.851s

==> Part IV
ok      mapreduce   10.650s

==> Part V (challenge)
Passed test

Part III: Distributing MapReduce tasks && Part IV: Handling worker failures

其三有个别和第四某些能够协作来做,重要是水到渠成schedule(),实现二个因而线程并发推行map worker 和 reduce worker 的 MEscort 框架。框架通过 RPC 来效仿布满式总括,并要带有 worker 的容灾功用。

郑重声明:本文版权归美高梅163888所有,转载文章仅为传播更多信息之目的,如作者信息标记有误,请第一时间联系我们修改或删除,多谢。