MapReduce基础编程
MapReduce编程指南
引入数据分区partition
默认情况下MR输出文件个数
在默认情况下,不管map阶段有多少个并发执行task,到reduce阶段,所有的结果都将有一个reduce来处理,并且最终结果输出到一个文件中。
此时,MapReduce的执行流程如下所示:
修改reducetask个数
在MapReduce程序的驱动类中,通过job提供的方法,可以修改reducetask的个数。
默认情况下不设置,reducetask个数为1,结果输出到一个文件中。
使用api修改reducetask个数之后,输出结果文件的个数和reducetask个数对应。比如设置为6个,此时的输出结果如下所示:
此时,MapReduce的执行流程如下所示:
数据分区概念
当MapReduce中有多个reducetask执行的时候,此时maptask的输出就会面临一个问题:究竟将自己的输出数据交给哪一个reducetask来处理,这就是所谓的数据分区(partition)问题。
默认分区规则
MapReduce默认分区规则是HashPartitioner。跟map输出的数据key有关。
当然用户也可以自己自定义分区规则。后面案例中说。
MapReduce执行流程
执行流程图
Map阶段执行过程
- 第一阶段是把输入目录下文件按照一定的标准逐个进行逻辑切片,形成切片规划。默认情况下,Split size = Block size。每一个切片由一个MapTask处理。(getSplits)
- 第二阶段是对切片中的数据按照一定的规则解析成
对。默认规则是把每一行文本内容解析成键值对。key是每一行的起始位置(单位是字节),value是本行的文本内容。(TextInputFormat) - 第三阶段是调用Mapper类中的map方法。上阶段中每解析出来的一个
,调用一次map方法。每次调用map方法会输出零个或多个键值对。 - 第四阶段是按照一定的规则对第三阶段输出的键值对进行分区。默认是只有一个区。分区的数量就是Reducer任务运行的数量。默认只有一个Reducer任务。
- 第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接输出到文件中。
- 第六阶段是对数据进行局部聚合处理,也就是combiner处理。键相等的键值对会调用一次reduce方法。经过这一阶段,数据量会减少。本阶段默认是没有的。
Reduce阶段执行过程
- 第一阶段是Reducer任务会主动从Mapper任务复制其输出的键值对。Mapper任务可能会有很多,因此Reducer会复制多个Mapper的输出。
- 第二阶段是把复制到Reducer本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。
- 第三阶段是对排序后的键值对调用reduce方法。键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS文件中。
MapReduce中key的重要性
在MapReduce编程中,核心是牢牢把握住每个阶段的输入输出key是什么。
因为mr中很多默认行为都跟key相关。
排序:key的字典序a-z 正序
分区:key.hashcode % reducetask 个数
分组:key相同的分为一组
最重要的是,如果觉得默认的行为不满足业务需求,MapReduce还支持自定义排序、分区、分组的规则,这将使得编程更加灵活和方便。
美国新冠疫情COVID-19统计
现有美国2021-1-28号,各个县county的新冠疫情累计案例信息,包括确诊病例和死亡病例,数据格式如下所示:
2021-01-28,Juneau City and Borough,Alaska,02110,1108,3
2021-01-28,Kenai Peninsula Borough,Alaska,02122,3866,18
2021-01-28,Ketchikan Gateway Borough,Alaska,02130,272,1
2021-01-28,Kodiak Island Borough,Alaska,02150,1021,5
2021-01-28,Kusilvak Census Area,Alaska,02158,1099,3
2021-01-28,Lake and Peninsula Borough,Alaska,02164,5,0
2021-01-28,Matanuska-Susitna Borough,Alaska,02170,7406,27
2021-01-28,Nome Census Area,Alaska,02180,307,0
2021-01-28,North Slope Borough,Alaska,02185,973,3
2021-01-28,Northwest Arctic Borough,Alaska,02188,567,1
2021-01-28,Petersburg Borough,Alaska,02195,43,0
字段含义如下:date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)。
MapReduce自定义对象
需求
分析
自定义一个对象CovidCountBean,用于封装每个县的确诊病例数和死亡病例数。注意需要实现hadoop的序列化机制。
以州state作为map阶段输出的key,以CovidCountBean作为value,这样经过MapReduce的默认排序分组规则,属于同一个州的数据就会变成一组进行reduce处理,进行累加即可得出每个州累计确诊病例。
代码实现
自定义JavaBean
public class CovidCountBean implements Writable{
private long cases;//确诊病例数
private long deaths;//死亡病例数
public CovidCountBean() {
}
public CovidCountBean(long cases, long deaths) {
this.cases = cases;
this.deaths = deaths;
}
public void set(long cases, long deaths) {
this.cases = cases;
this.deaths = deaths;
}
public long getCases() {
return cases;
}
public void setCases(long cases) {
this.cases = cases;
}
public long getDeaths() {
return deaths;
}
public void setDeaths(long deaths) {
this.deaths = deaths;
}
/**
* 序列化方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(cases);
out.writeLong(deaths);
}
/**
* 反序列化方法 注意顺序
*/
@Override
public void readFields(DataInput in) throws IOException {
this.cases = in.readLong();
this.deaths =in.readLong();
}
@Override
public String toString() {
return cases +"\t"+ deaths;
}
}
Mapper类
public class CovidSumMapper extends Mapper<LongWritable, Text, Text, CovidCountBean> {
Text outKey = new Text();
CovidCountBean outValue = new CovidCountBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
//州
outKey.set(fields[2]);
//Covid数据 确诊病例 死亡病例
outValue.set(Long.parseLong(fields[fields.length-2]),Long.parseLong(fields[fields.length-1]));
//map输出结果
context.write(outKey,outValue);
}
}
Reducer类
public class CovidSumReducer extends Reducer<Text, CovidCountBean,Text,CovidCountBean> {
CovidCountBean outValue = new CovidCountBean();
@Override
protected void reduce(Text key, Iterable<CovidCountBean> values, Context context) throws IOException, InterruptedException {
long totalCases = 0;
long totalDeaths =0;
//累加统计
for (CovidCountBean value : values) {
totalCases += value.getCases();
totalDeaths +=value.getDeaths();
}
outValue.set(totalCases,totalDeaths);
context.write(key,outValue);
}
}
程序驱动类
public class CovidSumDriver {
public static void main(String[] args) throws Exception{
//配置文件对象
Configuration conf = new Configuration();
// 创建作业实例
Job job = Job.getInstance(conf, CovidSumDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(CovidSumDriver.class);
// 设置作业mapper reducer类
job.setMapperClass(CovidSumMapper.class);
job.setReducerClass(CovidSumReducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CovidCountBean.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CovidCountBean.class);
// 配置作业的输入数据路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//判断输出路径是否存在 如果存在删除
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]),true);
}
// 提交作业并等待执行完成
boolean resultFlag = job.waitForCompletion(true);
//程序退出
System.exit(resultFlag ? 0 :1);
}
}
代码执行及结果
可以采用本地模式运行,也可以打成jar包在yarn集群上运行。
MapReduce自定义排序
需求
将美国2021-01-28,每个州state的确诊案例数进行倒序排序。
分析
如果你的需求中需要根据某个属性进行排序 ,不妨把这个属性作为key。因为MapReduce中key有默认排序行为的。但是需要进行如下考虑:
如果你的需求是正序,并且数据类型是Hadoop封装好的基本类型。这种情况下不需要任何修改,直接使用基本类型作为key即可。因为Hadoop封装好的类型已经实现了排序规则。
比如:LongWritable类型:
如果你的需求是倒序,或者数据类型是自定义对象。需要重写排序规则。需要对象实现Comparable接口,重写ComparTo方法。
compareTo方法用于将当前对象与方法的参数进行比较。
如果指定的数与参数相等返回0。
如果指定的数小于参数返回 -1。
如果指定的数大于参数返回 1。
例如:o1.compareTo(o2);
返回正数的话,当前对象(调用compareTo方法的对象o1)要排在比较对象(compareTo传参对象o2)后面,返回负数的话,放在前面。
代码实现
自定义对象排序
public class CovidCountBean implements WritableComparable<CovidCountBean> {
private long cases;//确诊病例数
private long deaths;//死亡病例数
public CovidCountBean() {
}
public CovidCountBean(long cases, long deaths) {
this.cases = cases;
this.deaths = deaths;
}
public void set(long cases, long deaths) {
this.cases = cases;
this.deaths = deaths;
}
public long getCases() {
return cases;
}
public void setCases(long cases) {
this.cases = cases;
}
public long getDeaths() {
return deaths;
}
public void setDeaths(long deaths) {
this.deaths = deaths;
}
/**
* 序列化方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(cases);
out.writeLong(deaths);
}
/**
* 反序列化方法 注意顺序
*/
@Override
public void readFields(DataInput in) throws IOException {
this.cases = in.readLong();
this.deaths =in.readLong();
}
@Override
public String toString() {
return cases +"\t"+ deaths;
}
/**
* 排序比较器 本业务中根据确诊案例数倒序排序
*/
@Override
public int compareTo(CovidCountBean o) {
return this.cases - o.getCases()> 0 ? -1:(this.cases - o.getCases() < 0 ? 1 : 0);
}
}
Mapper类
public class CovidSortSumMapper extends Mapper<LongWritable, Text, CovidCountBean,Text> {
CovidCountBean outKey = new CovidCountBean();
Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");
outKey.set(Long.parseLong(fields[1]),Long.parseLong(fields[2]));
outValue.set(fields[0]);
context.write(outKey,outValue);
}
}
Reducer类
public class CovidSortSumReducer extends Reducer<CovidCountBean, Text,Text,CovidCountBean> {
@Override
protected void reduce(CovidCountBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Text outKey = values.iterator().next();
context.write(outKey,key);
}
}
驱动程序类
public class CovidSortSumDriver {
public static void main(String[] args) throws Exception{
//配置文件对象
Configuration conf = new Configuration();
// 创建作业实例
Job job = Job.getInstance(conf, CovidSortSumDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(CovidSortSumDriver.class);
// 设置作业mapper reducer类
job.setMapperClass(CovidSortSumMapper.class);
job.setReducerClass(CovidSortSumReducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(CovidCountBean.class);
job.setMapOutputValueClass(Text.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CovidCountBean.class);
// 配置作业的输入数据路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//判断输出路径是否存在 如果存在删除
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]),true);
}
// 提交作业并等待执行完成
boolean resultFlag = job.waitForCompletion(true);
//程序退出
System.exit(resultFlag ? 0 :1);
}
}
执行结果
分区个数和reducetask个数关系
正常情况下: 分区的个数 = reducetask个数。
- 分区的个数 > reducetask个数 程序执行报错
- 分区的个数 < reducetask个数 有空文件产生
MapReduce Combiner
每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一。
combiner中文叫做数据规约。数据归约是指在尽可能保持数据原貌的前提下,最大限度地精简数据量。
combiner是MR程序中Mapper和Reducer之外的一种组件,默认情况下不启用。
combiner组件的父类就是Reducer,combiner和reducer的区别在于运行的位置:
combiner是在每一个maptask所在的节点运行
Reducer是接收全局所有Mapper的输出结果
combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量
具体实现步骤:
1、 自定义一个combiner继承Reducer,重写reduce方法
2、 在job中设置: job.setCombinerClass(CustomCombiner.class)combiner能够应用的前提是不能影响最终的业务逻辑,而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来。下述场景禁止使用,不仅优化了数据量,还改变了最终的结果
- 业务和数据个数相关的。
- 业务和整体排序相关的。
MapReduce自定义分组
分组概念和默认分组规则
分组在发生在reduce阶段,决定了同一个reduce中哪些数据将组成一组去调用reduce方法处理。默认分组规则是:key相同的就会分为一组(前后两个key直接比较是否相等)。
需要注意的是,在reduce阶段进行分组之前,因为进行数据排序行为,因此排序+分组将会使得key一样的数据一定被分到同一组,一组去调用reduce方法处理。
此外,用户还可以自定义分组规则:
写类继承 WritableComparator,重写Compare方法。
只要Compare方法返回为0,MapReduce框架在分组的时候就会认为前后两个相等,分为一组。还需要在job对象中进行设置 才能让自己的重写分组类生效。job.setGroupingComparatorClass(xxxx.class);
需求
找出美国2021-01-28,每个州state的确诊案例数最多的县county是哪一个。该问题也是俗称的TopN问题。
分析
自定义对象,在map阶段将“州state和累计确诊病例数cases”作为key输出,重写对象的排序规则,首先根据州的正序排序,如果州相等,按照确诊病例数cases倒序排序,发送到reduce。
在reduce端利用自定义分组规则,将州state相同的分为一组,然后取第一个即是最大值。
代码实现
自定义对象
public class CovidBean implements WritableComparable<CovidBean> {
private String state;//州
private String county;//县
private long cases;//确诊病例
public CovidBean() {
}
public CovidBean(String state, String county, long cases) {
this.state = state;
this.county = county;
this.cases = cases;
}
public void set (String state, String county, long cases) {
this.state = state;
this.county = county;
this.cases = cases;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
public String getCounty() {
return county;
}
public void setCounty(String county) {
this.county = county;
}
public long getCases() {
return cases;
}
public void setCases(long cases) {
this.cases = cases;
}
@Override
public String toString() {
return "CovidBean{" +
"state='" + state + '\'' +
", county='" + county + '\'' +
", cases=" + cases +
'}';
}
//todo 排序规则 根据州state正序进行排序 如果州相同 则根据确诊数量cases倒序排序
@Override
public int compareTo(CovidBean o) {
int result ;
int i = state.compareTo(o.getState());
if ( i > 0) {
result =1;
} else if (i <0 ) {
result = -1;
} else {
// 确诊病例数倒序排序
result = cases > o.getCases() ? -1 : 1;
}
return result;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(state);
out.writeUTF(county);
out.writeLong(cases);
}
@Override
public void readFields(DataInput in) throws IOException {
this.state = in.readUTF();
this.county = in.readUTF();
this.cases = in.readLong();
}
}
Mapper类
public class CovidTop1Mapper extends Mapper<LongWritable, Text, CovidBean, NullWritable> {
CovidBean outKey = new CovidBean();
NullWritable outValue = NullWritable.get();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
//封装数据: 州 县 确诊病例
outKey.set(fields[2],fields[1],Long.parseLong(fields[4]));
context.write(outKey,outValue);
}
}
Reducer类
public class CovidTop1Reducer extends Reducer<CovidBean, NullWritable,CovidBean,NullWritable> {
@Override
protected void reduce(CovidBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//不遍历迭代器,此时key就是分组中的第一个key 也就是该州确诊病例数最多的县对应的数据
context.write(key,NullWritable.get());
}
}
自定义分组
public class CovidGroupingComparator extends WritableComparator {
protected CovidGroupingComparator(){
super(CovidBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
CovidBean aBean = (CovidBean) a;
CovidBean bBean = (CovidBean) b;
return aBean.getState().compareTo(bBean.getState());
}
}
驱动程序类
public class CovidTop1Driver {
public static void main(String[] args) throws Exception{
//配置文件对象
Configuration conf = new Configuration();
// 创建作业实例
Job job = Job.getInstance(conf, CovidTop1Driver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(CovidTop1Driver.class);
// 设置作业mapper reducer类
job.setMapperClass(CovidTop1Mapper.class);
job.setReducerClass(CovidTop1Reducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(CovidBean.class);
job.setMapOutputValueClass(NullWritable.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(CovidBean.class);
job.setOutputValueClass(NullWritable.class);
//todo 设置自定义分组
job.setGroupingComparatorClass(CovidGroupingComparator.class);
// 配置作业的输入数据路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//判断输出路径是否存在 如果存在删除
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]),true);
}
// 提交作业并等待执行完成
boolean resultFlag = job.waitForCompletion(true);
//程序退出
System.exit(resultFlag ? 0 :1);
}
}
执行结果
自定义分组扩展:topN问题
需求
找出美国2021-01-28,每个州state的确诊案例数最多的县county前3个。Top3问题。
分析
自定义对象,在map阶段将“州state和累计确诊病例数cases”作为key输出,重写对象的排序规则,首先根据州的正序排序,如果州相等,按照确诊病例数cases倒序排序,发送到reduce。
在reduce端利用自定义分组规则,将州state相同的分为一组,然后遍历取值,取出每组中的前3个即可。
为了验证验证结果方便,可以在输出的时候以cases作为value,实际上为空即可,value并不实际意义。
代码实现
自定义对象、自定义分组类
这两个和上述的Top1一样,此处就不再重复编写。可以直接使用。
Mapper类
public class CovidTopNMapper extends Mapper<LongWritable, Text, CovidBean,LongWritable> {
CovidBean outKey = new CovidBean();
LongWritable outValue = new LongWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
//封装数据: 州 县 确诊病例
outKey.set(fields[2],fields[1],Long.parseLong(fields[4]));
outValue.set(Long.parseLong(fields[4]));
context.write(outKey,outValue);
}
}
Reducer类
public class CovidTopNReducer extends Reducer<CovidBean, LongWritable,CovidBean,LongWritable> {
@Override
protected void reduce(CovidBean key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
int num =0;
for (LongWritable value : values) {
if(num < 3 ){ //输出每个州最多的前3个
context.write(key,value);
num++;
}else{
return;
}
}
}
}
程序驱动类
public class CovidTopNDriver {
public static void main(String[] args) throws Exception{
//配置文件对象
Configuration conf = new Configuration();
// 创建作业实例
Job job = Job.getInstance(conf, CovidTopNDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(CovidTopNDriver.class);
// 设置作业mapper reducer类
job.setMapperClass(CovidTopNMapper.class);
job.setReducerClass(CovidTopNReducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(CovidBean.class);
job.setMapOutputValueClass(LongWritable.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(CovidBean.class);
job.setOutputValueClass(LongWritable.class);
//todo 设置自定义分组
job.setGroupingComparatorClass(CovidGroupingComparator.class);
// 配置作业的输入数据路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//判断输出路径是否存在 如果存在删除
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]),true);
}
// 提交作业并等待执行完成
boolean resultFlag = job.waitForCompletion(true);
//程序退出
System.exit(resultFlag ? 0 :1);
}
}
执行结果
MapReduce并行度机制
MapTask并行度机制
概念
MapTask的并行度指的是map阶段有多少个并行的task共同处理任务。map阶段的任务处理并行度,势必影响到整个job的处理速度。那么,MapTask并行实例是否越多越好呢?其并行度又是如何决定呢?
原理机制
一个MapReducejob的map阶段并行度由客户端在提交job时决定,即客户端提交job之前会对待处理数据进行逻辑切片。切片完成会形成切片规划文件(job.split),每个逻辑切片最终对应启动一个maptask。
逻辑切片机制由FileInputFormat实现类的getSplits()方法完成。
FileInputFormat中默认的切片机制:
- 简单地按照文件的内容长度进行切片
- 切片大小,默认等于block大小
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
比如待处理数据有两个文件:
file1.txt 320M
file2.txt 10M
经过FileInputFormat的切片机制运算后,形成的切片信息如下:
file1.txt.split1—0M~128M
file1.txt.split2—128M~256M
file1.txt.split3—256M~320M
file2.txt.split1—0M~10M
相关参数、优化
在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize));
切片主要由这几个值来运算决定:
minsize:默认值:1
配置参数: mapreduce.input.fileinputformat.split.minsize
maxsize:默认值:Long.MAXValue
配置参数:mapreduce.input.fileinputformat.split.maxsize
blocksize
因此,默认情况下,split size=block size
,在hadoop 2.x中为128M。
但是,不论怎么调参数,都不能让多个小文件“划入”一个split。
还有个细节就是:
当bytesRemaining/splitSize > 1.1
不满足的话,那么最后所有剩余的会作为一个切片。从而不会形成例如129M文件规划成两个切片的局面。
ReduceTask并行度机制
reducetask并行度同样影响整个job的执行并发度和执行效率,与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:
job.setNumReduceTasks(4);
如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜。
注意: reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask。
MapReduce工作流程详解
MapTask工作机制详解
流程图
执行步骤
整个Map阶段流程大体如上图所示。简单概述:input File通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理,数据被map处理结束之后交给OutputCollector收集器,对其结果key进行分区(默认使用hash分区),然后写入buffer,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。
详细步骤:
- Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。
- Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。
- 默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。
- 把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
- 对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。
MapReduce Shuffle机制
Shuffle的本意是洗牌、混洗的意思,把一组有规则的数据尽量打乱成无规则的数据。
而在MapReduce中,Shuffle更像是洗牌的逆过程,指的是将map端的无规则输出按指定的规则“打乱”成具有一定规则的数据,以便reduce端接收处理。
shuffle是Mapreduce的核心,它分布在Mapreduce的map阶段和reduce阶段。一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle。
1).Partition阶段:将MapTask的结果输出到默认大小为100M的环形缓冲区,保存之前会对key进行分区的计算,默认Hash分区等。
2).Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序。
3).Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。
4).Copy阶段: ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。
5).Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
6).Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可。