MapReduce 原理之Shuffle机制

1.Shuffle机制
    Mapreduce 确保每个 reducer 的输入都是按键排序的。系统执行排序的过程(即将 map 输出作为输入传给 reducer )称为 shuffle


2.Partition分区
(1) 问题引出:要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(2) 默认partition分区
public class HashPartitioner<K, V> extends Partitioner<K, V> {
  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}
//默认分区是根据key的hashCode对reduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
(3) 自定义Partitioner步骤
    ① 自定义类继承Partitioner,重写getPartition()方法
 public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {

// 1 获取电话号码的前三位
        String preNum = key.toString().substring(0, 3);
        
        int partition = 4;
        
        // 2 判断是哪个省
        if ("136".equals(preNum)) {
            partition = 0;
        }else if ("137".equals(preNum)) {
            partition = 1;
        }else if ("138".equals(preNum)) {
            partition = 2;
        }else if ("139".equals(preNum)) {
            partition = 3;
        }
        return partition;
    }
}
    ② 在job驱动中,设置自定义partitioner:
 job.setPartitionerClass(CustomPartitioner.class);
    ③ 自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task
job.setNumReduceTasks(5);
(4) 注意:
    如果reduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
    如果1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
    如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000;
    例如:假设自定义分区数为5,则
job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件
job.setNumReduceTasks(2);会报错
job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件

3.WritableComparable排序
    排序是MapReduce框架中最重要的操作之一。Map Task和Reduce Task均会对数据(按照key)进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
    对于Map Task,它会将处理的结果暂时放到一个缓冲区中,当缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次排序,并将这些有序数据写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行一次合并,以将这些文件合并成一个大的有序文件。
    对于Reduce Task,它从每个Map Task上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则放到磁盘上,否则放到内存中。如果磁盘上文件数目达到一定阈值,则进行一次合并以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据写到磁盘上。当所有数据拷贝完毕后,Reduce Task统一对内存和磁盘上的所有数据进行一次合并。
每个阶段的默认排序
(1) 排序的分类:
    ① 部分排序:
        MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。
    ② 全排序:
        如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。
        替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为上述文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。
    ③ 辅助排序:(GroupingComparator分组)
        Mapreduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的map任务且这些map任务在不同轮次中完成时间各不相同。一般来说,大多数MapReduce程序会避免让reduce函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。
    ④ 二次排序:
        在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
(2) 自定义排序WritableComparable
    bean对象实现WritableComparable接口重写compareTo方法,就可以实现排序
@Override
public int compareTo(FlowBean o) {
    // 倒序排列,从大到小
    return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

4.Combiner合并
(1) combiner是MR程序中Mapper和Reducer之外的一种组件。
(2) combiner组件的父类就是Reducer。
(3) combiner和reducer的区别在于运行的位置:
    Combiner是在每一个maptask所在的节点运行;
    Reducer是接收全局所有Mapper的输出结果;
(4) combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。
(5) combiner能够应用的前提是不能影响最终的业务逻辑,而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来。
    Mapper
        3 5 7 ->(3+5+7)/3=5
        2 6 ->(2+6)/2=4
    Reducer
        (3+5+7+2+6)/5=23/5    不等于    (5+4)/2=9/2
(6) 自定义Combiner实现步骤:
    ① 自定义一个combiner继承Reducer,重写reduce方法
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
        // 1 汇总操作
        int count = 0;
        for(IntWritable v :values){
            count = v.get();
        }
        // 2 写出
        context.write(key, new IntWritable(count));
    }
}
    ② 在job驱动类中设置:  
job.setCombinerClass(WordcountCombiner.class);