MapReduce 的 shuffle 机制

因为 MapReduce 确保每一个 reducer 的输入都是按键排序的,所以在 map 处理完数据以后传给 reducer 的这个过程当中须要进行一系列操做,这个操做过程就是 shuffle。在《hadoop权威指南》中指出,shuffle 是 MapReduce 的 “心脏”,了解 shuffle 工做机制有助于咱们优化 MapReduce 程序,接下来咱们就来看看它的运行机制。缓存

shuffle 流程

先用一张图表示 shuffle 的整个过程。从图中咱们能够看到 shuffle 流程主要是对 map 的数据进行排序、分组发送给 reduce 后再进行合并的一个过程,咱们将分 map 和 reduce 两个部分来说解 shuffle 的流程。网络

shfulle机制

map 端 shuffle

map 任务开始产生数据时,会先将这些数据存储在一个 内存缓冲区 中,这个缓冲区大小默认为 100MB,能够经过设置 mapreduce.task.io.sort.mb 来改变其大小。因为 hadoop 处理的是海量数据,100MB 的内存显然是不够用的,所以达到必定 阈值 时(默认为 0.8,能够经过设置 mapreduce.map.sort.spill.percent 来改变其大小),会将内存中的内容溢出(spill)到磁盘当中,溢出的路径是由 mapreduce.cluster.local.dir 属性指定的。在溢出到磁盘的过程当中,若是缓冲区中还有空间,map 程序会继续输出数据到缓冲区中,若是没有空间的话,map 输出程序则会阻塞直到数据写入到磁盘后。函数

在上图中 buffer in memory(输出到缓存中) 和 merge on disk(合并到磁盘) 这两个步骤中间还有一个 分区、排序 的步骤。分区能达到跟分组相似的效果,例如读取一个含有大量电话号码的数据时,把 138 的分为一组,把 135 分为一组。这个效果能够经过自定一个类继承 Partitioner,而后在 Job 中调用 setPartitionerClass 方法设置分区类来完成。在每一个分区中,后台线程按照键的值对数据在内存中进行排序,若是有一个 combiner 方法,则在排序完成以后运行它。combiner 方法会使 map 输出更紧凑,减小写到磁盘中的数据和传给 reducer 的数据。oop

通常状况下,map 的输出结果并不会进行压缩,因为数据量大,对网络资源的耗费很大,为了对 mapreduce 程序进行优化,咱们能够将 mapreduce.map.output.compress 属性设置为 true,这样当 map 将数据写到磁盘时就会对数据进行压缩。具体的压缩格式能够经过 mapreduce.map.output.compress.codec 属性来设置。当全部记录都写完以后,map 会合并所有的溢出文件为一个分区且排序的文件传给 reduce。优化

reduce 端 shuffle

reducer 经过 HTTP 的方式获取 map 的的输出数据,这是复制阶段。reducer 在复制阶段把 Map 输出复制到 Reducer 的内存或磁盘,一个 Map 任务完成后,Reduce 就开始复制输出。复制完全部的 map 输出以后,reducer 对这些数据进行合并,使它们仍然保持有序。合并完成以后,直接将这些数据输入到 reduce 函数中,从而省略一次写入磁盘的时间。至此,整个 shuffle 流程就完成了。线程

以上即是我对 MR shuffle 机制的理解,若是其中有错,欢迎指出。3d