MR 运行原理

1、Map-Reduce的逻辑过程

假设我们需要处理一批有关天气的数据,其格式如下:

  • 按照ASCII码存储,每行一条记录
  • 每一行字符从0开始计数,第15个到第18个字符为年
  • 第25个到第29个字符为温度,其中第25位是符号+/-

0067011990999991950051507+0000+

0043011990999991950051512+0022+

0043011990999991950051518-0011+

0043012650999991949032412+0111+

0043012650999991949032418+0078+

0067011990999991937051507+0001+

0043011990999991937051512-0002+

0043011990999991945051518+0001+

0043012650999991945032412+0002+

0043012650999991945032418+0078+

现在需要统计出每年的最高温度。

Map-Reduce主要包括两个步骤:Map和Reduce

每一步都有key-value对作为输入和输出:

  • map阶段的key-value对的格式是由输入的格式所决定的,如果是默认的TextInputFormat,则每行作为一个记录进程处理,其中key为此行的开头相对于文件的起始位置,value就是此行的字符文本
  • map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应

对于上面的例子,在map过程,输入的key-value对如下:

(0, 0067011990999991950051507+0000+)

(33, 0043011990999991950051512+0022+)

(66, 0043011990999991950051518-0011+)

(99, 0043012650999991949032412+0111+)

(132, 0043012650999991949032418+0078+)

(165, 0067011990999991937051507+0001+)

(198, 0043011990999991937051512-0002+)

(231, 0043011990999991945051518+0001+)

(264, 0043012650999991945032412+0002+)

(297, 0043012650999991945032418+0078+)

在map过程中,通过对每一行字符串的解析,得到年-温度的key-value对作为输出:

(1950, 0)

(1950, 22)

(1950, -11)

(1949, 111)

(1949, 78)

(1937, 1)

(1937, -2)

(1945, 1)

(1945, 2)

(1945, 78)

在reduce过程,将map过程中的输出,按照相同的key将value放到同一个列表中作为reduce的输入

(1950, [0, 22, –11])

(1949, [111, 78])

(1937, [1, -2])

(1945, [1, 2, 78])

在reduce过程中,在列表中选择出最大的温度,将年-最大温度的key-value作为输出:

(1950, 22)

(1949, 111)

(1937, 1)

(1945, 78)

其逻辑过程可用如下图表示:

image

下图大概描述了Map-Reduce的Job运行的基本原理:

image

 

下面我们讨论JobConf,其有很多的项可以进行配置:

  • setInputFormat:设置map的输入格式,默认为TextInputFormat,key为LongWritable, value为Text
  • setNumMapTasks:设置map任务的个数,此设置通常不起作用,map任务的个数取决于输入的数据所能分成的input split的个数
  • setMapperClass:设置Mapper,默认为IdentityMapper
  • setMapRunnerClass:设置MapRunner, map task是由MapRunner运行的,默认为MapRunnable,其功能为读取input split的一个个record,依次调用Mapper的map函数
  • setMapOutputKeyClass和setMapOutputValueClass:设置Mapper的输出的key-value对的格式
  • setOutputKeyClass和setOutputValueClass:设置Reducer的输出的key-value对的格式
  • setPartitionerClass和setNumReduceTasks:设置Partitioner,默认为HashPartitioner,其根据key的hash值来决定进入哪个partition,每个partition被一个reduce task处理,所以partition的个数等于reduce task的个数
  • setReducerClass:设置Reducer,默认为IdentityReducer
  • setOutputFormat:设置任务的输出格式,默认为TextOutputFormat
  • FileInputFormat.addInputPath:设置输入文件的路径,可以使一个文件,一个路径,一个通配符。可以被调用多次添加多个路径
  • FileOutputFormat.setOutputPath:设置输出文件的路径,在job运行前此路径不应该存在

当然不用所有的都设置,由上面的例子,可以编写Map-Reduce程序如下:

public class MaxTemperature {

    public static void main(String[] args) throws IOException {

        if (args.length != 2) {

            System.err.println("Usage: MaxTemperature <input path> <output path>");

            System.exit(-1);

        }

        JobConf conf = new JobConf(MaxTemperature.class);

        conf.setJobName("Max temperature");

        FileInputFormat.addInputPath(conf, new Path(args[0]));

        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        conf.setMapperClass(MaxTemperatureMapper.class);

        conf.setReducerClass(MaxTemperatureReducer.class);

        conf.setOutputKeyClass(Text.class);

        conf.setOutputValueClass(IntWritable.class);

        JobClient.runJob(conf);

    }

}

3、Map-Reduce数据流(data flow)

Map-Reduce的处理过程主要涉及以下四个部分:

  • 客户端Client:用于提交Map-reduce任务job
  • JobTracker:协调整个job的运行,其为一个Java进程,其main class为JobTracker
  • TaskTracker:运行此job的task,处理input split,其为一个Java进程,其main class为TaskTracker
  • HDFS:hadoop分布式文件系统,用于在各个进程间共享Job相关的文件

image

3.1、任务提交

JobClient.runJob()创建一个新的JobClient实例,调用其submitJob()函数。

  • 向JobTracker请求一个新的job ID
  • 检测此job的output配置
  • 计算此job的input splits
  • 将Job运行所需的资源拷贝到JobTracker的文件系统中的文件夹中,包括job jar文件,job.xml配置文件,input splits
  • 通知JobTracker此Job已经可以运行了

提交任务后,runJob每隔一秒钟轮询一次job的进度,将进度返回到命令行,直到任务运行完毕。

 

3.2、任务初始化

 

当JobTracker收到submitJob调用的时候,将此任务放到一个队列中,job调度器将从队列中获取任务并初始化任务。

初始化首先创建一个对象来封装job运行的tasks, status以及progress。

在创建task之前,job调度器首先从共享文件系统中获得JobClient计算出的input splits。

其为每个input split创建一个map task。

每个task被分配一个ID。

 

3.3、任务分配

 

TaskTracker周期性的向JobTracker发送heartbeat。

在heartbeat中,TaskTracker告知JobTracker其已经准备运行一个新的task,JobTracker将分配给其一个task。

在JobTracker为TaskTracker选择一个task之前,JobTracker必须首先按照优先级选择一个Job,在最高优先级的Job中选择一个task。

TaskTracker有固定数量的位置来运行map task或者reduce task。

默认的调度器对待map task优先于reduce task

当选择reduce task的时候,JobTracker并不在多个task之间进行选择,而是直接取下一个,因为reduce task没有数据本地化的概念。

 

3.4、任务执行

 

TaskTracker被分配了一个task,下面便要运行此task。

首先,TaskTracker将此job的jar从共享文件系统中拷贝到TaskTracker的文件系统中。

TaskTracker从distributed cache中将job运行所需要的文件拷贝到本地磁盘。

其次,其为每个task创建一个本地的工作目录,将jar解压缩到文件目录中。

其三,其创建一个TaskRunner来运行task。

TaskRunner创建一个新的JVM来运行task。

被创建的child JVM和TaskTracker通信来报告运行进度。

 

3.4.1、Map的过程

MapRunnable从input split中读取一个个的record,然后依次调用Mapper的map函数,将结果输出。

map的输出并不是直接写入硬盘,而是将其写入缓存memory buffer。

当buffer中数据的到达一定的大小,一个背景线程将数据开始写入硬盘。

在写入硬盘之前,内存中的数据通过partitioner分成多个partition。

在同一个partition中,背景线程会将数据按照key在内存中排序。

每次从内存向硬盘flush数据,都生成一个新的spill文件。

当此task结束之前,所有的spill文件被合并为一个整的被partition的而且排好序的文件。

reducer可以通过http协议请求map的输出文件,tracker.http.threads可以设置http服务线程数。

3.4.2、Reduce的过程

当map task结束后,其通知TaskTracker,TaskTracker通知JobTracker。

对于一个job,JobTracker知道TaskTracer和map输出的对应关系。

reducer中一个线程周期性的向JobTracker请求map输出的位置,直到其取得了所有的map输出。

reduce task需要其对应的partition的所有的map输出。

reduce task中的copy过程即当每个map task结束的时候就开始拷贝输出,因为不同的map task完成时间不同。

reduce task中有多个copy线程,可以并行拷贝map输出。

当很多map输出拷贝到reduce task后,一个背景线程将其合并为一个大的排好序的文件。

当所有的map输出都拷贝到reduce task后,进入sort过程,将所有的map输出合并为大的排好序的文件。

最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每个key,最后的结果写入HDFS。

 

image

 

3.5、任务结束

 

当JobTracker获得最后一个task的运行成功的报告后,将job得状态改为成功。

当JobClient从JobTracker轮询的时候,发现此job已经成功结束,则向用户打印消息,从runJob函数中返回。


------------------------------------------------------------------------------------------------------------------------------------

市面上的hadoop权威指南一类的都是老版本的书籍了,索性学习并翻译了下最新版的Hadoop:The Definitive Guide, 4th Edition与大家共同学习。

  我们通过提交jar包,进行MapReduce处理,那么整个运行过程分为五个环节:

  1、向client端提交MapReduce job.

  2、随后yarn的ResourceManager进行资源的分配.

  3、由NodeManager进行加载与监控containers.

  4、通过applicationMaster与ResourceManager进行资源的申请及状态的交互,由NodeManagers进行MapReduce运行时job的管理.

  5、通过hdfs进行job配置文件、jar包的各节点分发。

Job 提交过程

  job的提交通过调用submit()方法创建一个JobSubmitter实例,并调用submitJobInternal()方法。整个job的运行过程如下:

  1、向ResourceManager申请application ID,此ID为该MapReduce的jobId。

  2、检查output的路径是否正确,是否已经被创建。

  3、计算input的splits。

  4、拷贝运行job 需要的jar包、配置文件以及计算input的split 到各个节点。

  5、在ResourceManager中调用submitAppliction()方法,执行job

Job 初始化过程

  1、当resourceManager收到了submitApplication()方法的调用通知后,scheduler开始分配container,随之ResouceManager发送applicationMaster进程,告知每个nodeManager管理器。

  2、由applicationMaster决定如何运行tasks,如果job数据量比较小,applicationMaster便选择将tasks运行在一个JVM中。那么如何判别这个job是大是小呢?当一个job的mappers数量小于10个只有一个reducer或者读取的文件大小要小于一个HDFS block时,(可通过修改配置项mapreduce.job.ubertask.maxmaps,mapreduce.job.ubertask.maxreduces以及mapreduce.job.ubertask.maxbytes 进行调整)

  3、在运行tasks之前,applicationMaster将会调用setupJob()方法,随之创建output的输出路径(这就能够解释,不管你的mapreduce一开始是否报错,输出路径都会创建)

Task 任务分配

  1、接下来applicationMaster向ResourceManager请求containers用于执行map与reduce的tasks(step 8),这里map task的优先级要高于reduce task,当所有的map tasks结束后,随之进行sort(这里是shuffle过程后面再说),最后进行reduce task的开始。(这里有一点,当map tasks执行了百分之5%的时候,将会请求reduce,具体下面再总结)

  2、运行tasks的是需要消耗内存与CPU资源的,默认情况下,map和reduce的task资源分配为1024MB与一个核,(可修改运行的最小与最大参数配置,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.cpu.vcores,mapreduce.reduce.reduce.cpu.vcores.)

Task 任务执行

  1、这时一个task已经被ResourceManager分配到一个container中,由applicationMaster告知nodemanager启动container,这个task将会被一个主函数为YarnChild的java application运行,但在运行task之前,首先定位task需要的jar包、配置文件以及加载在缓存中的文件

  2、YarnChild运行于一个专属的JVM中,所以任何一个map或reduce任务出现问题,都不会影响整个nodemanager的crash或者hang

  3、每个task都可以在相同的JVM task中完成,随之将完成的处理数据写入临时文件中。

Mapreduce数据流

运行进度与状态更新

  1、MapReduce是一个较长运行时间的批处理过程,可以是一小时、几小时甚至几天,那么Job的运行状态监控就非常重要。每个job以及每个task都有一个包含job(running,successfully completed,failed)的状态,以及value的计数器,状态信息及描述信息(描述信息一般都是在代码中加的打印信息),那么,这些信息是如何与客户端进行通信的呢?

  2、当一个task开始执行,它将会保持运行记录,记录task完成的比例,对于map的任务,将会记录其运行的百分比,对于reduce来说可能复杂点,但系统依旧会估计reduce的完成比例。当一个map或reduce任务执行时,子进程会持续每三秒钟与applicationMaster进行交互

Job 完成

   最终,applicationMaster会收到一个job完成的通知,随后改变job的状态为successful。最终,applicationMaster与task containers被清空。

 

Shuffle与Sort

  从map到reduce的过程,被称之为shuffle过程,MapReduce使到reduce的数据一定是经过key的排序的,那么shuffle是如何运作的呢?

  当map任务将数据output时,不仅仅是将结果输出到磁盘,它是将其写入内存缓冲区域,并进行一些预分类

  

  1、The Map Side

  首先map任务的output过程是一个环状的内存缓冲区,缓冲区的大小默认为100MB(可通过修改配置项mpareduce.task.io.sort.mb进行修改),当写入内存的大小到达一定比例,默认为80%(可通过mapreduce.map.sort.spill.percent配置项修改),便开始写入磁盘。

  在写入磁盘之前,线程将会指定数据写入与reduce相应的patitions中,最终传送给reduce.在每个partition中,后台线程将会在内存中进行Key的排序,(如果代码中有combiner方法,则会在output时就进行sort排序,这里,如果只有少于3个写入磁盘的文件,combiner将会在outputfile前启动,如果只有一个或两个,那么将不会调用)

  这里将map输出的结果进行压缩会大大减少磁盘IO与网络传输的开销(配置参数mapreduce.map .output.compress 设置为true,如果使用第三方压缩jar,可通过mapreduce.map.output.compress.codec进行设置)

   随后这些paritions输出文件将会通过HTTP发送至reducers,传送的最大启动线程通过mapreduce.shuffle.max.threads进行配置。

  2、The Reduce Side

  首先上面每个节点的map都将结果写入了本地磁盘中,现在reduce需要将map的结果通过集群拉取过来,这里要注意的是,需要等到所有map任务结束后,reduce才会对map的结果进行拷贝,由于reduce函数有少数几个复制线程,以至于它可以同时拉取多个map的输出结果。默认的为5个线程(可通过修改配置mapreduce.reduce.shuffle.parallelcopies来修改其个数)

  这里有个问题,那么reducers怎么知道从哪些机器拉取数据呢? 

  当所有map的任务结束后,applicationMaster通过心跳机制(heartbeat mechanism),由它知道mapping的输出结果与机器host,所以reducer会定时的通过一个线程访问applicationmaster请求map的输出结果

  Map的结果将会被拷贝到reduce task的JVM的内存中(内存大小可在mapreduce.reduce.shuffle.input.buffer.percent中设置)如果不够用,则会写入磁盘。当内存缓冲区的大小到达一定比例时(可通过mapreduce.reduce.shuffle.merge.percent设置)或map的输出结果文件过多时(可通过配置mapreduce.reduce.merge.inmen.threshold),将会除法合并(merged)随之写入磁盘。

  这时要注意,所有的map结果这时都是被压缩过的,需要先在内存中进行解压缩,以便后续合并它们。(合并最终文件的数量可通过mapreduce.task.io.sort.factor进行配置) 最终reduce进行运算进行输出。

参考文献:《Hadoop:The Definitive Guide, 4th Edition》