MapReduce整体流程与核心源码解读
Debug环境准备
Debug代码:MR经典入门案例Word Count
Mapper类
package cn.itcast.hadoop.mapreduce.wordcount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @description:
*/
public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("\\s+");
for (String word : words) {
context.write(new Text(word),new LongWritable(1));
}
}
}
Reducer类
package cn.itcast.hadoop.mapreduce.wordcount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @description:
*/
public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {
count +=value.get();
}
context.write(key,new LongWritable(count));
}
}
程序运行的主类
package cn.itcast.hadoop.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @description:
*/
public class WordCountDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
// 创建作业实例
Job job = Job.getInstance(getConf(), WordCountDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(this.getClass());
// 设置作业mapper reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 配置作业的输入数据路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 判断输出路径是否存在 如果存在删除
FileSystem fs = FileSystem.get(getConf());
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]),true);
}
// 提交作业并等待执行完成
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
// 配置文件对象
Configuration conf = new Configuration();
// 使用工具类ToolRunner提交程序
int status = ToolRunner.run(conf, new WordCountDriver(), args);
// 退出客户端程序 客户端退出状态码和MapReduce程序执行结果绑定
System.exit(status);
}
}
Debug工具:IDEA Debug功能的使用
1.1.1 如何进入Debug模式
鼠标单击在需要停止的代码处打上断点:
右键本地运行程序的时候,选择debug模式运行:
1.1.1 Debug 模式界面
1.1.1 Debug 调试相关快捷键
1.1.1 扩展:IDEA 远程Debug
生产环境中,MapReduce程序通常在yarn模式下运行,是一种分布式执行的环境。可以通过远程调试的方式在IDEA中进行debug。
1.1.1.1 Linux集群启动监听服务
要想通过远程断点调试,那么首先得在远程服务器端启动MapReduce任务的时候需要先暂停并开启一个监听服务,等待客户端的连接调试,可以通过设置运行时JVM的参数来搞定
-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000
该命令参数是JDK自带的,可以通过 java -agentlib:jdwp=help 来查看agentlib各参数的具体说明,其中address=8000表示监听端口为8000。
针对上述参数,可以使用shell临时变量进行设置:
export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000"
在运行MapReduce程序的时候,可以发现启动程序暂定了并且开启了服务监听端口8000
1.1.1.1 Windows本地IDEA**配置**
在IDEA中新建一个远程Debug调试配置:
配置信息如下:
Debug运行即可开始远程的调试MapReduce任务执行过程了
后面的使用跟本地dubug类似,只不过此时程序是在远程服务器上运行的。
MapReduce job提交源码追踪
MapReduce程序入口方法
作为使用java语言编写的MapReduce程序,其入口方法为main方法。在main方法中,使用了ToolRunner启动运行了MapReduce客户端主类,其逻辑实现定义在run方法中。
@Override
public int run(String[] args) throws Exception {
// 创建作业实例
Job job = Job.getInstance(getConf(), WordCountDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(this.getClass());
// 设置作业mapper reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 配置作业的输入数据路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//判断输出路径是否存在 如果存在删除
FileSystem fs = FileSystem.get(getConf());
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]),true);
}
// 提交作业并等待执行完成
return job.waitForCompletion(true) ? 0 : 1;
}
job.waitForCompletion
客户端的最后执行了Job.waitForCompletion()方法,从名字上可以看出该方法的功能是等待MR程序执行完毕。进入该方法内部:
在判断状态state可以提交Job后,执行submit()方法。monitorAndPrintJob()方法会不断的刷新获取job运行的进度信息,并打印。boolean参数verbose为true表明要打印运行进度,为false就只是等待job运行结束,不打印运行日志。
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
//当job状态为define时
if (state == JobState.DEFINE) {
submit();//aw:提交job
}
if (verbose) {//verbose值由用户指定 boolean类型
//aw:随着进度和任务的进行,实时监视作业和打印状态
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
// 从客户端根据轮询间隔(默认5000 ms) 拉取完成状态信息
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();//检查作业是否成功完成。返回true表示成功。
}
job.submit
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
//再次检查确保作业状态为define
ensureState(JobState.DEFINE);
//设置使用新api
setUseNewAPI();
//跟程序运行环境建立连接
connect();
//获取job提交器 根据运行环境分为local提交器、yarn提交器
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);//todo 提交job
}
});
//客户端提交job成功,状态更新为running
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
connect
MapReduce作业提交时,连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster。Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法。
在Cluster内部,有一个与集群进行通信的客户端通信协议ClientProtocol实例client,它由ClientProtocolProvider的静态create()方法构造,而Hadoop2.x中提供了两种模式的ClientProtocol,分别为Yarn模式的YARNRunner和Local模式的LocalJobRunner,Cluster实际上是由它们负责与集群进行通信的,而Yarn模式下,ClientProtocol实例YARNRunner对象内部有一个ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息。
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {//若cluster空,则构造Cluster实例
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration());
}
});
}
}
Cluster
Cluster类中最重要的两个成员变量是客户端通信协议提供者ClientProtocolProvider、客户端通信协议ClientProtocol,实例叫做client,而后者是依托前者的create()方法生成的。
在ClientProtocol中,定义了很多方法,客户端可以使用这些方法进行job的提交、杀死、或是获取一些程序状态信息。
在Cluster的构造方法中,完成了初始化的动作。
initialize
在Cluster类的构造方法中,调用了initialize初始化方法。依次取出每个ClientProtocolProvider,通过其create()方法构造ClientProtocol实例。如果配置文件没有配置YARN信息,则构建LocalRunner,MR任务本地运行,如果配置文件有配置YARN信息,则构建YarnRunner,MR任务在YARN集群上运行。
ClientProtocolProvider
上面create()方法时提到了两种ClientProtocolProvider实现类。
MapReduce中,ClientProtocolProvider抽象类的实现共有YarnClientProtocolProvider、LocalClientProtocolProvider两种,前者为Yarn模式,而后者为Local模式。
Cluster中客户端通信协议ClientProtocol实例,要么是Yarn模式下的YARNRunner,要么就是Local模式下的LocalJobRunner。
LocalClientProtocolProvider
YarnClientProtocolProvider
YARNRunner中最重要的一个变量就是ResourceManager的代理ResourceMgrDelegate类型的resMgrDelegate实例。
Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息,其内部有一个YarnClient实例YarnClient,负责与Yarn进行通信,还有ApplicationId、ApplicationSubmissionContext等与特定应用程序相关的成员变量。
submitJobInternal
在submit方法的最后,调用了提交器submitter.submitJobInternal方法进行任务的提交。它是提交Job的内部方法,实现了提交Job的所有业务逻辑。
JobSubmitter的类一共有四个类成员变量,分别为:
文件系统FileSystem实例jtFs:用于操作作业运行需要的各种文件等;
客户端通信协议ClientProtocol实例submitClient:用于与集群交互,完成作业提交、作业状态查询等,上文已经介绍过了。
提交作业的主机名submitHostName;
提交作业的主机地址submitHostAddress。
下面就是提交任务的核心代码:
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs 检查作业的输出规范的有效性
//aw:比如检查输出路径是否配置并且是否存在。正确情况是已经配置且不存在
checkSpecs(job);
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
//aw:获取作业准备区路径,用于作业及相关资源的提交存放,比如:jar、切片信息、配置信息等
//默认是/tmp/hadoop-yarn/staging/提交作业用户名/.staging
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {//记录提交作业的主机IP、主机名
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
//aw: 与运行集群通信,将获取的jobID设置入job
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
//创建最终作业准备区路径,jobStagingArea后接/jobID
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {//设置一些作业参数
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir 获得路径的授权令牌
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
}
//aw:拷贝作业相关的资源文件到submitJobDir作业准备区,比如:-libjars,-files,-archives
copyAndConfigureFiles(job, submitJobDir);
//创建文件job.xml 用于保存作业的配置信息
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job todo
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
//aw:生成本次作业的输入切片信息,并把切片信息写入作业准备区submitJobDir
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,
MRJobConfig.DEFAULT_JOB_MAX_MAP);
if (maxMaps >= 0 && maxMaps < maps) {
throw new IllegalArgumentException("The number of map tasks " + maps +
" exceeded limit " + maxMaps);
}
// write "queue admins of the queue to which job is being submitted"
// to job file.队列信息
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// 把作业配置信息写入作业准备区的job.xml文件中
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
//aw:到这里,终于进行真正的作用提交了
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
MapReduce整体流程
MapReduce从概念上讲,它很简单,先分再合,分而治之。但是在Hadoop中却有一种复杂的实现。虽然很多细节和特性已经被框架封装完毕,用户负责业务逻辑即可。
但是对于开发人员来说,如果能够更加清楚准确的理解MapReduce执行流程,那么对于熟练使用MapReduce框架解决业务问题、理解分布式计算思想将会有非常大的帮助。
在附件资料中,还有更加完整详细的MapReduce执行流程图。
Map阶段执行流程
Map阶段整体概述
整个Mapper阶段流程大体如图所示。简单概述:
input File通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理;
数据被map处理结束之后交给OutputCollector收集器,对其结果key进行分区(默认使用hash分区),然后写入buffer;
每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘;
当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。
前置:解读MapTask类
在MapReduce程序中,初登场的task叫做maptask。MapTask类作为maptask的一个载体,调用的就是里面的run方法,开启map任务。
第一层调用(run)
在MapTask.run方法的第一层调用中,有下面两个重要的代码段。
map阶段的任务划分
if (isMapTask()) { // 判断当前的task是否为maptask
// If there are no reducers then there won't be any sort. Hence the map
// phase will govern the entire attempt's progress.
// 如果reducetask的个数为0 也就意味着程序没有reducer阶段 mapper的输出就是程序最终的输出
// 这样的话,就没有必要进行shuffle了。
if (conf.getNumReduceTasks() == 0) {
//map阶段占据整个maptask任务的100%
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// If there are reducers then the entire attempt's progress will be
// split between the map phase (67%) and the sort phase (33%).
// 如果有reducetask的话,map阶段占据67%,sort阶段占据33%
// 为什么要sort 因为要shuffle给reducetask
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
运行Mapper
在提交任务的时候 MR框架会自己进行选择使用什么API。默认情况下,使用的都是新的API,除非特别指定了使用old API 详细见job.setUseNewAPI()。
第二层调用(runNewMapper)准备部分
默认情况下,框架使用new API来运行,所以将执行runNewMapper()。
runNewMapper内第一大部分代码我们称之为maptask运行的准备部分,其主要逻辑是创建maptask运行时需要的各种依赖:包括Split切片信息、inputFormat、LineRecordReader、用户写的map函数(位于自定义mapper中)、taskContext上下文、输出收集器OutputCollector,用于maptask处理完数据输出结果。
TaskContext
TaskContext为Task的上下文对象,基于此对象可以获取到Task执行期间的相关状态信息。
split
inputFormat
RecordReader
在NewTrackingRecordReader方法中,最终创建了RecordReader
mapper
OutputCollector
第二层调用(runNewMapper)工作部分
runNewMapper内第二大部分代码我们称之为maptask工作干活的部分,其主要逻辑是:如何从切片读取数据,如何调用map处理数据,如何调用OutputCollector收集输出的数据。
RecoderReader.initialize
默认情况下的实现逻辑位于LineRecorderReader中。核心逻辑:
打开文件定位切片的位置,判断文件是否压缩,如果切片不是第一个切片那么读取数据的时候舍去第一行数据不要读取。
mapper.run
NewOutputCollector
在这段程序中,createSortingCollector创建map输出收集器是最复杂的一部分,因为和后续环形缓冲区操作有关。进入createSortingCollector方法。
注意此处的默认实现是MapOutputBuffer。
关于输出收集器和环形缓冲区的细节后续章节继续描述。
InputFormat
整个MapReduce以InputFormat开始,其负责读取待处理的数据。默认的实现叫做Text InputFormat。
InputFormat核心逻辑体现在两个方面:
一是:如何读取待处理目录下的文件。一个一个读?还是一起读?
二是:读取数据的行为是什么以及返回什么样的结果?一行一行读?按字节读?
getSplits
对于待处理的目录文件,MapReduce程序面临的首要问题就是:究竟启动多少个MapTask来处理本次job。
该问题也叫做maptask的并行度问题,指的是map阶段有多少个并行的task共同处理任务。
map阶段并行度由客户端在提交job时决定,即客户端提交job之前会对待处理数据进行逻辑切片。切片完成会形成切片规划文件(job.split),每个逻辑切片最终对应启动一个maptask。
逻辑切片机制由FileInputFormat实现类的getSplits()方法完成。
首先需要计算出split size切片大小,其计算方法如下:
其中maxSize,minSize的默认值为:
因此,通过计算,默认情况下,最终split size= block size =128M。
以切片大小逐个遍历待处理的文件,形成逻辑规划文件,比如待处理目录下有下面几个文件:
a.txt 300M
b.txt 100M
将会生成如下几个逻辑切片信息:
split0-------->a.txt 0-128M
split1-------->a.txt 128-256M
split2-------->a.txt 256-300M
split3-------->b.txt 0-100M
默认情况下,有多少个**split就对应启动多少个MapTask**。
在getSplits方法中,创建了一个集合splits,用于保存最终的切片信息。
集合中的每个元素就是一个切片的具体信息:
生成的切片信息在客户端提交job中,也就是JobSubmitter. writeSplits方法中,把所有切片进行排序,大的切片在前,然后序列化到一个文件中,此文件叫做逻辑切片文件。
bytesRemaining
在进行逻辑切片的时候,假如说一个文件恰好是129M大小,那么根据默认的逻辑切片规则将会形成一大一小两个切片(0-128 128-129),并且将启动两个maptask。这明显对资源的利用效率不高。
因此在设计中,MapReduce时刻会进行bytesRemaining,剩下文件大小,如果剩下的不满足 bytesRemaining/splitSize > SPLIT_SLOP,那么将不再继续split,而是剩下的所有作为一个切片整体。
createRecordReader
最终负责读取切片数据的是RecordReader类,默认实现是LineRecordReader。其名字已经透露出来其读取数据的行为是:一行一行按行读取数据。
在LineRecordReader中,核心的方法有: initialize初始化方法,nextKeyValue读取数据方法
initialize
initialize属于LineRecordReader的初始化方法,会被MapTask调用且调用一次。里面描述了如何从切片读取数据。
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;//拿到分配的切片
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);//一行能够处理的最大长度
start = split.getStart();//要处理的切片中第一个字节的位置
end = start + split.getLength();//切片的结束位置
final Path file = split.getPath();//切片的存储路径
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);//打开切片文件开始读数据,返回的是FSDataInputStream输入流
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);//获得文件中编码器
if (null!=codec) {//判断是否进行编码压缩 如果不为空 意味着文件被编码了
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {//判断压缩文件是否可切分 如果是可切分的压缩算法
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {//来到这里 表示压缩的编码算法是不可被切分的
if (start != 0) {
// So we have a split that is only part of a file stored using
// a Compression codec that cannot be split.
throw new IOException("Cannot seek in " +
codec.getClass().getSimpleName() + " compressed stream");
}
//不可切分的压缩文件整体由SplitLineReader来处理
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {//这里表示文件未被编码压缩
fileIn.seek(start);
in = new UncompressedSplitLineReader(
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
//如果当前处理的不是第一个切片 那么将舍弃第一行记录不处理
if (start != 0) {
//读取一行数据 读取的时候会判断用户是否指定了换行符 如果指定使用用户指定的 如果未指定使用默认的
//默认的换行符取决于操作系统 Linux:\n windows:\r\n Mac:\n
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
nextKeyValue
nextKeyValue方法用于判断是否还有下一行数据以及定义了按行读取数据的逻辑:
一行一行读取,返回《key,value》键值对类型数据
其中key是每行起始位置的offset偏移量,value为这一行的内容。
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);//起始位置偏移量
if (value == null) {
value = new Text();
}
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
//AllenWoon: 总是多读取下一个切片的一行数据
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {//起始位置为0的话 跳过文本的UTF-8 BOM头信息
newSize = skipUtfByteOrderMark();
} else {
//读取该行数据 默认使用readDefaultLine方法读取数据 根据\r\n回车换行符读取一行行数据
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos += newSize;//更新偏移量
}
if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
优化措施
由于文件在HDFS上进行存储的时候,物理上会进行分块存储,可能会导致文件内容的完整性被破坏。为了避免这个问题,在实际读取split数据的时候,每个maptask会进行读取行为的调整,具体来说:
一是:每个maptask都多处理下一个split的第一行数据;
二是:除了第一个,每个maptask都舍去自己的第一行数据不处理。
Mapper
mapper中有3个方法,分别是setup初始化方法、map方法、cleanup扫尾清理方法。而maptask的业务处理核心是在map方法中定义的。用户可以在自定义的mapper中重写父类的map方法逻辑。
map
对于map方法,如果用户不重写,父类中也有默认实现逻辑。其逻辑为:输入什么,原封不动的输出什么,也就意味着不对数据进行任何处理。
此外还要注意,map方法的调用周期、次数取决于父类中run方法。当LineRecordReader. nextKeyValue返回true时,意味着还有数据,Line RecordReader每读取一行数据,返回一个kv键值对,就调用一次map方法。
因此得出结论:mapper阶段默认情况下是基于行处理输入数据的。
Output Collector
map方法中根据业务逻辑一行行处理完之后,最终是调用的是context.write()方法将结果输出。至于输出的数据到哪里,在MapTask类中定义了两种情况,即:MR程序是否有Reducer阶段?
如果有reducer阶段,则创建输出收集器。
如果没有reducer阶段,则创建outputFormat,默认实现是TextOutputFormat,直接将处理的结果输出到指定目录文件中。
我们关注的重点当然是带有reducer阶段的MR程序,否则程序到此就结束了。
进入NewOutputCollector构造方法,核心方法是createSortingCollector。此外还确定了程序是否需要进行分区以及分区的实现类是什么。
在createSortingCollector方法内部,核心是创建具体的输出收集器MapOutputBuffer。MapOutputBuffer就是口语中俗称的map输出的缓冲区,即在有reduce阶段的情况下,map的输出结果不是直接写入磁盘的,还是先写入内存的缓冲区中。
当创建好MapOutputBuffer之后,在返回给MapTask之前对其进行了init初始化。关于初始化的细节我们在环形缓冲区中细说。
Partitioner
在程序的mapper阶段context.write打上断点,追踪一下输出的数据进行了哪些操作。
不断进入发现,最终调用的是MapTask中的Write方法。Write方法中把输出的数据kv通过收集器写入了环形缓冲区,在写入之前这里还进行了数据分区计算。
partitioner.getPartition(key, value, partitions)就是计算每个mapper的输出分区编号是多少。注意,只有当reducetask >1的时候。才会进行分区的计算。
默认的分区器在JobContextImpl中定义,是HashPartitioner。
默认的分区规则也很简单: key.hashCode() % numReduceTasks
为了避免hashcode值为负数,通过和Integer最大值进行与计算修正hashcode为正。
Circular buffer
环形缓冲区概念及意义
环形缓冲区(Circular buffer)的环形是一个抽象概念。缓冲区的作用是批量收集mapper的输出结果,减少磁盘IO的影响。想一下,一个一个写和一个批次一个批次写,哪种效率高?
环形缓冲区本质是byte数组,里面存放着key、value的序列化数据和key、value的元数据信息。
其中kvbuffer字节数组存储真正的kv数据,kvmeta存储对应的元数据。
每个key/value对应一个元数据,元数据由4个int组成:第一个int存放value的起始位置,第二个存放key的起始位置,第三个存放partition,最后一个存放value的长度。
因为key/value写入kvbuffer时是要经过序列化的,所以我们要记录每一个key和value序列化后在kvbuffer中的起始和终止位置。
//aw:存储元数据信息 注意这是一个intbuffer
private IntBuffer kvmeta;
//分割标识,因为meta数据和key value内容都存放在同一个环形缓冲区,所以需要分隔开
int equator;
//aw:内存缓冲区的核心 存储key value序列化之后的数据 注意是字节数组
byte[] kvbuffer;
/**
* aw:一个key/value键值对对应一条元数据,一条元数据由4个int组成。
* 第一个存放value的起始位置(VALSTART)
* 第二个存放key的起始位置(KEYSTART)
* 第三个存放partition(PARTITION)
* 第四个存放value的长度(VALLEN)
* 以此类推,然后下面4个int是下一个kv的元数据。
*/
private static final int VALSTART = 0; // val offset in acct
private static final int KEYSTART = 1; // key offset in acct
private static final int PARTITION = 2; // partition offset in acct
private static final int VALLEN = 3; // length of value
key/value序列化的数据和元数据在环形缓冲区中的存储是由equator(赤道)分隔的。
key/value按照索引递增的方向存储,meta则按照索引递减的方向存储,将其数组抽象为一个环形结构之后,以equator为界,key/value顺时针存储,meta逆时针存储。
环形缓冲区是有大小限制,默认是100MB。由参数mapreduce.task.io.sort.mb控制。
环形缓冲区的初始化
在MapTask中创建OutputCollector的时候,对环形缓冲区进行了初始化的动作。
初始化的过程中,主要是构造环形缓冲区的抽象数据结构。包括不限于:设置缓冲区大小、溢出比、初始化kvbuffer|kvmeta、设置Equator标识分界线、构造排序的实现类、combiner、压缩编码等。
环形缓冲区的数据收集
mapper的map方法处理完数据之后,是调用context.write方法将结果进行输出。debug不断进入发现,最终调用的是MapTask中的Write方法。
在write方法中,调用collector.collect向环形缓冲区中写入数据,数据写入之前也进行了分区partition计算。在有reducer阶段的情况下,collector的实现是MapOutputBuffer。
收集数据到环形缓冲区核心逻辑有:序列化key到字节数组,序列化value到字节数组,写入该条数据的元数据(起始位置、partition、长度)、更新kvindex。
Spill、Sort
Spill溢写
环形缓冲区虽然可以减少IO次数,但是总归有容量限制,不能把所有数据一直写入内存,数据最终还是要落入磁盘上存储的,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。
这个溢写是由单独线程来完成,不影响往缓冲区继续写数据。整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size spill percent = 100MB 0.8 = 80MB),spill线程启动。
spill线程是由startSpill()方法唤醒的,在进行spill操作的时候,此时map向buffer的写入操作并没有阻塞,剩下20M可以继续使用。
溢写的线程叫做SpillThread,查看其run方法,run中主要是sortAndSpill。
创建溢写记录(索引)、溢写文件(该文件位于机器的本地文件系统,而不是HDFS)。
SpillThread输出的每个spill文件都有一个索引,其中包含有关每个文件中分区的信息-分区的开始位置和结束位置。这些索引存储在内存中,叫做SpillRecord溢写记录,可使用内存量为mapreduce.task.index.cache.limit.bytes,默认情况下等于1MB。
如果不足以将索引存储在内存中,则所有下一个创建的溢出文件的索引都将与溢出文件一起写入磁盘。
溢写数据到临时文件中:
更新spillRec:
将内存中的spillRec写入磁盘变成索引文件。
在spill的同时,map往buffer的写操作并没有停止,依然在调用collect。满足条件继续spill,以此往复。
Sort 排序
在溢写的过程中,会对数据进行排序。
排序规则是MapOutputBuffer.compare,采用的是QuickSort快速排序。
先对partition进行排序其次对key值排序。这样,数据按分区排序,并且在每个分区内按键对数据排序。
Merge
每次spill都会在磁盘上生成一个临时文件,如果map的输出结果真的很大,有多次这样的spill发生,磁盘上相应的就会有多个临时文件存在。这样将不利于reducetask处理数据。
当mapper和最后一次溢出都结束时,溢出线程终止,合并(merge)阶段开始。
在合并阶段,应将所有溢出文件合并在一起以形成一个map输出文件。
默认情况下,一个合并过程最多可以处理100个溢出文件(负责此操作的参数是mapreduce.task.io.sort.factor)。如果超过,将进行多次merge合并。
最终一个maptask的结果是一个输出文件,其中包含map的所有输出数据以及索引文件,索引文件描述了ReduceTask的分区开始-停止信息,以便能够轻松获取与其将运行的相关分区数据。
Combiner
Combiner(规约)的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一。默认情况下不开启。
当job设置了Combiner,可能会在spill和merge的两个阶段执行。
spill时combiner执行情况源码:
merge时combiner执行情况源码:MapTask中搜mergeParts方法。
Reduce阶段执行流程
Reduce阶段整体概述
Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。
copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,到各个maptask那里去拉取属于自己分区的数据。在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。
待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段。
完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。
前置:解读Reduce Task类
ReduceTask类作为reducetask的一个载体,调用的就是里面的run方法,然后开启reduce任务。
第一层调用(ReduceTask.run)
reduce阶段的任务划分
整个reducetask分为3个阶段:copy拉取数据、sort排序数据、reduce处理数据。
shuffle操作
整个shuffle操作过程除了shuffle核心任务之外,还创建了reducetask工作相关的一些组件,包括但不限于:
codec解编码器
CombineOutputCollector输出收集器
shuffleConsumerPlugin(负责reduce端shuffle插件)
并且对shuffleConsumerPlugin进行了初始化init、run运行。运行返回的结果就是reduce shuffle之后的全部数据。这是shuffle过程的核心,后续深入。
shuffleContext上下文对象
GroupingComparator分组比较器
运行reducer
shuffle完的结果将进入到reducer进行最终的reduce处理。
第二层调用(runNewReducer)准备部分
默认情况下,框架使用new API来运行,所以将执行runNewReducer()。
runNewReducer内第一大部分代码我们称之为reducetask运行的准备部分。
其主要逻辑是创建reducetask运行时需要的各种依赖,包括:
taskContext上下文
创建用户编写设置的reducer类
outputFormat输出数据组件
ReducerContext上下文
接下来我们进去看一下怎么创建的reducerContext,我们进到它的实现类ReduceContextImpl里面
第二层调用(runNewReducer)工作部分
reducer.run
在runNewReducer的代码中,最后还调用了Reduer.run方法开始针对shuffle后的数据进行reduce操作。
RecordWriter
Shuffle-init
注意ShuffleConsumerPlugin是一个接口,默认的实现只有一个Shuffle.class。
初始化的过程中,核心逻辑就是创建MergeManagerImpl类。在MergeManagerImpl类中,核心的有:确定shuffle时的一些条件、是否允许内存到内存合并、启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行。
shuffle条件
启动MemToMemMerge
因为fetch来数据首先放入在内存中的,正常情况下在内存中对数据进行合并是最快的,可惜的是,默认情况下,是不开启内存到内存的合并的。
启动inMemoryMerger
启动onDiskMerger
Shuffle-run
注意ShuffleConsumerPlugin是一个接口,默认的实现只有一个Shuffle.class。
EventFetcher线程
fetchers线程
Shuffle-Copy阶段
Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。如果是本地模式运行,启动一个fetcher线程拉取数据,否则启动5个线程并发拉取。
MapHost类
MapHost类用于标记MapTask任务状态,记下MapTask host信息。
Fetcher.run
获得所有maptask处于PENDING待处理状态的主机。
然后进入核心方法,copyFromHost,从map拉取数据
copyFromHost
建立拉取数据的输入流
拉取copy数据
copyMapOutput
首先进行判断copy过来的数据放置在哪里?优先内存,超过限制放置磁盘。
因此获得的mapOutput就有两种具体的实现。通过mapOutput.shuffle开始拉取数据。
不断追踪下去,最终是两种不同的实现:
InMemoryMapOutput:把copy来的数据放置到reducetask内存中。
OnDiskMapOutput:把copy来的数据放置到磁盘上。
Shuffle-Merge阶段
在启动Fetcher线程copy数据过程中已经启动了两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。
可以从Shuffle. init——> createMergeManager—-> new MergeManagerImpl中确定。
inMemoryMerger
inMemoryMerger本质是一个MergeThread线程。进入线程run方法。
MergeThread.run
在内存中合并,合并的结果写入磁盘。
onDiskMerger
onDiskMerger本质是一个MergeThread线程。进入线程run方法。
MergeThread.run
此时应该来到在磁盘上合并的实现类中:
closeOnDiskFile
不管是在内存中合并还是在磁盘上合并,最终都调用了closeOnDiskFile方法,关闭磁盘文件。
finalMerge
当所有的Fetcher拉取数据结束之后,会进行最终一次合并,最终合并的所有数据保存在kvIter。
可以在shuffle类的run中发现。
Shuffle-Sort阶段
在合并的过程中,会对数据进行Sort排序,默认情况下是key的字典序(WritableComparable),如果用户设置比较器,则以用户设置的为准。
Reducer
当合并排序结束之后,进入到reduce阶段。
在runNewReducer方法的最后,调用了reducer.run方法运行reducer。
Reducer.run
点击进入run方法
首先在Reduce.run中调用context.nextKey()决定是否进入while,,然后调用nextKeyValue将key/value的值从input中读出,其次通过context.getValues将Iterator传入reduce中,在reduce中通过Iterator.hasNext查看此key是否有下个value,然后通过Iterator.next调用nextKeyValue去input中读取value。
然后循环迭代Iterator,读取input中相同key的value。
也就是说reduce中相同key的value值在Iterator.next中通过nextKeyValue读取的,每调用一次next就从input中读一个value。
通俗理解:key相同的被分为一组,一组中所有的value会组成一个Iterable。key则是当前的value与之对应的key。
Reducer.reduce
对于reduce方法,如果用户不重写,父类中也有默认实现逻辑。其逻辑为:输入什么,原封不动的输出什么,也就意味着不对数据进行任何处理。
通常会基于业务需求重新父类的reduce方法。
OutputFormat
reduce阶段的最后是通过调用context.write方法将数据写出的。
负责输出数据的组件叫做OutputFormat,默认实现是TextOutPutFormat。而真正负责写数据的组件叫做LineRecordWriter,Write方法就定义在其中,这一点和输入组件很是类似。
LineRecordWriter的行为是一次输出写一行,再有输出换行写。
在构造LineRecordWriter的时候,已经设置了输出的key,value之间是以\t制表符分割的。
MapReduce shuffle
shuffle是什么
Shuffle的本意是洗牌、混洗的意思,把一组有规则的数据尽量打乱成无规则的数据。
而在MapReduce中,Shuffle更像是洗牌的逆过程,指的是将map端的无规则输出按指定的规则“打乱”成具有一定规则的数据,以便reduce端接收处理。
shuffle是Mapreduce的核心,它分布在Mapreduce的map阶段和reduce阶段。一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle。
1).Collect阶段:将MapTask的结果输出到默认大小为100M的环形缓冲区,保存的是key/value,Partition分区信息等。
2).Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序。
3).Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。
4).Copy阶段: ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。
5).Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
6).Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可。
shuffle弊端
shuffle阶段过程繁琐、琐碎,涉及了多个阶段的任务交接。
shuffle中频繁进行数据内存到磁盘、磁盘到内存、内存再到磁盘的过程。效率极低。
shuffle阶段,大量的数据从map阶段输出,发送到reduce阶段,这一过程中,可能会涉及到大量的网络IO。
shuffle开启压缩机制
概述
可以对map的输出进行压缩(map输出到reduce输入的过程,可以shuffle过程中网络传输的数据量)
可以对reduce的输出结果进行压缩(最终保存到hdfs上的数据,主要是减少占用HDFS存储)
压缩算法
使用hadoop checknative来查看hadoop支持的各种压缩算法,如果出现openssl为false,那么就在线安装一下依赖包。
hadoop支持的压缩算法
压缩格式 | 工具 | 算法 | 文件扩展名 | 是否可切分 |
---|---|---|---|---|
DEFLATE | 无 | DEFLATE | .deflate | 否 |
Gzip | gzip | DEFLATE | .gz | 否 |
bzip2 | bzip2 | bzip2 | bz2 | 是 |
LZO | lzop | LZO | .lzo | 否 |
LZ4 | 无 | LZ4 | .lz4 | 否 |
Snappy | 无 | Snappy | .snappy | 否 |
各种压缩算法对应使用的java类
压缩格式 | 对应使用的java类 |
---|---|
DEFLATE | org.apache.hadoop.io.compress.DeFaultCodec |
gzip | org.apache.hadoop.io.compress.GZipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
LZ4 | org.apache.hadoop.io.compress.Lz4Codec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
压缩的设置方式
方式一:代码中设置
设置map阶段的压缩
Configuration configuration = new Configuration();
configuration.set("mapreduce.map.output.compress","true");
configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
设置reduce阶段的压缩
configuration.set("mapreduce.output.fileoutputformat.compress","true");
configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD");
configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
方式二:配置文件全局设置
我们可以修改mapred-site.xml配置文件,然后重启集群,以便对所有的mapreduce任务进行压缩。
map输出数据进行压缩
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
reduce输出数据进行压缩
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.type</name>
<value>RECORD</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
所有节点都要修改mapred-site.xml,修改完成之后记得重启集群