在MR编程中,最典型的业务就是求sum,max,min,avg,distinct, group by 还有 join 等操做的实现了。事实上,不管是那种业务。 MapReduce的编程框架已经决定了要把mapper阶段计算出来的key-value会按照key作组划分。因此reduceTask当中的reduce方法,其实接收到的参数就是key相同的一组key-value,而后根据业务逻辑作规约。好比distinct操做。若是须要按照某个字段值进行去重,那么只须要把该要进行去重的字段作key就OK,而后在reducer阶段,再在每一组中输出一个key-value值便可。java
下面以一个简单的单词去重做为例子:apache
直接上源码,部分解释在源码中,请细看:编程
package com.ghgj.mazh.mapreduce.distinct; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 做者: 马中华:http://blog.csdn.net/zhongqi2513 * 日期: 2017年10月25日下午12:34:25 * * 描述:单词去重 * */ public class DistinctWordMR { public static void main(String[] args) throws Exception { // 指定hdfs相关的参数 Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop06:9000"); System.setProperty("HADOOP_USER_NAME", "hadoop"); Job job = Job.getInstance(conf); // 设置jar包所在路径 job.setJarByClass(DistinctWordMR.class); // 指定mapper类和reducer类 job.setMapperClass(DistinctWordMRMapper.class); job.setReducerClass(DistinctWordMRReducer.class); // 指定maptask的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 指定reducetask的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 指定该mapreduce程序数据的输入和输出路径 // Path inputPath = new Path("d:/wordcount/input"); // Path outputPath = new Path("d:/wordcount/output"); Path inputPath = new Path("/wc/input"); Path outputPath = new Path("/wc/output"); FileSystem fs = FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // 最后提交任务 boolean waitForCompletion = job.waitForCompletion(true); System.exit(waitForCompletion ? 0 : 1); } /** * 做者: 马中华:http://blog.csdn.net/zhongqi2513 * 日期: 2017年10月25日下午12:39:34 * * 描述:单词去重MR中的mapper组件。 读取文件而后切分出单词 */ private static class DistinctWordMRMapper extends Mapper<LongWritable, Text, Text, NullWritable> { private Text outkey = new Text(); /** * 在单词计数的场景中。 把单词做为key输出便可, 不用输出value */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(" "); for (String word : split) { outkey.set(word); context.write(outkey, NullWritable.get()); } } } /** * 做者: 马中华:http://blog.csdn.net/zhongqi2513 * 日期: 2017年10月25日下午12:39:20 * * 描述:单词去重的MR程序的reducer组件 */ private static class DistinctWordMRReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { /** * reduce方法没调用一次,就接收到一组相同的单词。因此,在此由于是去重的业务,因此直接输出一次key便可。就表示这一组单词就取一个。就至关于实现去重的业务 */ context.write(key, NullWritable.get()); } } }
下面是程序接收到的数据:app
hello huangbo hello xuzheng hello wangbaoqiang one two three four five one two three four one two three one two hello hi
five four hello hi huangbo one three two wangbaoqiang xuzheng
一、MapReduce编程框架中,必定会对mapper阶段输出的key-value排序,会按照key-value中的key排序,默认按照天然顺序排序。并且只会按照key进行排序框架
二、若是一个MapReduce程序,没有reducer阶段,那么mapper和reducer中间的shuffle过程就没有,因此这种状况,是不会排序的,也就是说,只要一个MR程序有reducer阶段,那么该程序必定会对key进行排序。ide
问题:若是想要进行排序的字段在value中呢,因为MR编程模型只会对key进行排序,因此要怎么实现呢。?oop