MapReduce入门与基础理论

初识MapReduce

理解MapReduce思想

MapReduce思想在生活中处处可见,每个人或多或少都曾接触过这种思想。MapReduce的思想核心是“先分再合分而治之”, 所谓“分而治之”就是把一个复杂的问题,按照一定的“分解”方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果,把各部分的结果组成整个问题的结果。

这种思想来源于日常生活与工作时的经验,同样也完全适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。

image-20211116224313624

Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。

Reduce负责“合”,即对map阶段的结果进行全局汇总。

这两个阶段合起来正是MapReduce思想的体现。

image-20211116223922011

一个比较形象的语言解释MapReduce:  

我们要数停车场中的所有的车数量。你数第一列,我数第二列。这就是“Map”。我们人越多,能够同时数车的人就越多,速度就越快。

数完之后,我们聚到一起,把所有人的统计数加在一起。这就是“Reduce”。

场景:如何模拟实现分布式计算

什么是分布式计算

分布式计算是一种计算方法,和集中式计算是相对的。

随着计算技术的发展,有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成。

分布式计算将该应用分解成许多小的部分,分配给多台计算机进行处理。这样可以节约整体计算时间,大大提高计算效率。

image-20211117084148013

大数据场景下模拟实现

image-20211117084223160

Hadoop MapReduce设计构思

MapReduce是Hadoop的一个模块,是一个分布式运算程序的编程框架。

对许多开发者来说,自己完完全全实现一个并行计算程序难度太大,而MapReduce就是一种简化并行计算的编程模型,降低了开发并行应用的入门门槛。

Hadoop MapReduce构思体现在如下的三个方面。

如何对付大数据处理

对相互间不具有计算依赖关系的大数据计算任务,实现并行最自然的办法就是采取MapReduce分而治之的策略。

也就是Map阶段分的阶段,把大数据拆分成若干份小数据,多个程序同时并行计算产生中间结果;然后是Reduce聚合阶段,通过程序对并行的结果进行最终的汇总计算,得出最终的结果。

并行计算的第一个重要问题是如何划分计算任务或者计算数据以便对划分的子任务或数据块同时进行计算。不可分拆的计算任务或相互间有依赖关系的数据无法进行并行计算!

image-20211117084321086

构建抽象模型

MapReduce借鉴了函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型。

Map: 对一组数据元素进行某种重复式的处理;

image-20211117084342975

Reduce: 对Map的中间结果进行某种进一步的结果整理。

image-20211117084356957

MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:

map: (k1; v1) → [(k2; v2)]

reduce: (k2; [v2]) → [(k3; v3)]

Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。通过以上两个编程接口,大家可以看出MapReduce处理的数据类型是键值对

统一架构、隐藏底层细节

如何提供统一的计算框架,如果没有统一封装底层细节,那么程序员则需要考虑诸如数据存储、划分、分发、结果收集、错误恢复等诸多细节;为此,MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。

MapReduce最大的亮点在于通过抽象模型和计算框架把需要做什么(what need to do)与具体怎么做(how to do)分开了,为程序员提供一个抽象和高层的编程接口和框架。

程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:从分布代码的执行,到大到数千小到单个节点集群的自动调度使用。

Hadoop MapReduce简介

MapReduce介绍

Hadoop MapReduce是一个软件框架,用于轻松编写应用程序,这些应用程序以可靠,容错的方式并行处理大型硬件集群(数千个节点)上的大量数据(多TB数据集)。

MapReduce是一种面向海量数据处理的一种指导思想,也是一种用于对大规模数据进行分布式计算的编程模型。

MapReduce最早由Google于2004年在一篇名为《MapReduce:Simplified Data Processingon Large Clusters》的论文中提出,把分布式数据处理的过程拆分为Map和Reduce两个操作函数(受到Lisp以及其他函数式编程语言的启发),随后被Apache Hadoop参考并作为开源版本提供支持。它的出现解决了人们在最初面临海量数据束手无策的问题,同时,它还是易于使用和高度可扩展的,使得开发者无需关系分布式系统底层的复杂性即可很容易的编写分布式数据处理程序,并在成千上万台普通的商用服务器中运行。

image-20211117084512518

MapReduce特点

  1. 易于编程

Mapreduce框架提供了用于二次开发得接口;简单地实现一些接口,就可以完成一个分布式程序。任务计算交给计算框架去处理,将分布式程序部署到hadoop集群上运行,集群节点可以扩展到成百上千个等。

  1. 良好的扩展性

当计算机资源不能得到满足的时候,可以通过增加机器来扩展它的计算能力。基于MapReduce的分布式计算得特点可以随节点数目增长保持近似于线性的增长,这个特点是MapReduce处理海量数据的关键,通过将计算节点增至几百或者几千可以很容易地处理数百TB甚至PB级别的离线数据。

  1. 高容错性

Hadoop集群是分布式搭建和部署的,任何单一机器节点宕机了,它可以把上面的计算任务转移到另一个节点上运行,不影响整个作业任务得完成,过程完全是由Hadoop内部完成的。

  1. 适合海量数据的离线处理

可以处理GB、TB和PB级别得数据量

MapReduce局限性

MapReduce虽然有很多的优势,也有相对得局限性,不代表不能做,而是在有些场景下实现的效果比较差,并不适合用MapReduce来处理,主要表现在以下结果方面:

  1. 实时计算性能差

MapReduce主要应用于离线作业,无法作到秒级或者是亚秒级得数据响应。

  1. 不能进行流式计算

流式计算特点是数据是源源不断的计算,并且数据是动态的;而MapReduce作为一个离线计算框架,主要是针对静态数据集的,数据是不能动态变化的。

Hadoop MapReduce编程

MapReduce架构体系

一个完整的mapreduce程序在分布式运行时有三类实例进程:

1、MRAppMaster:负责整个程序的过程调度及状态协调

2、MapTask:负责map阶段的整个数据处理流程

3、ReduceTask:负责reduce阶段的整个数据处理流程

image-20211117090222755

MapReduce编程规范

MapReduce分布式的运算程序需要分成2个阶段,分别是Map阶段和Reduce阶段。Map阶段对应的是MapTask并发实例,完全并行运行。Reduce阶段对应的是ReduceTask并发实例,数据依赖于上一个阶段所有MapTask并发实例的数据输出结果。

MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。

用户编写的程序分成三个部分:MapperReducerDriver(提交运行mr程序的客户端驱动)。

用户自定义的Mapper和Reducer都要继承各自的父类。Mapper中的业务逻辑写在map()方法中,Reducer的业务逻辑写在reduce()方法中。整个程序需要一个Driver来进行提交,提交的是一个描述了各种必要信息的job对象。

最需要注意的是:整个MapReduce程序中,数据都是以kv键值对的形式流转的。因此在实际编程解决各种业务问题中,需要考虑每个阶段的输入输出kv分别是什么。并且在MapReduce中数据会因为某些默认的机制进行排序进行分组。所以说kv的类型数据确定及其重要。

Map Reduce工作执行流程

整个MapReduce工作流程可以分为3个阶段:map、shuffle、reduce。

image-20211117090427377

map阶段:

负责把从数据源读取来到数据进行处理,默认情况下读取数据返回的是kv键值对类型,经过自定义map方法处理之后,输出的也应该是kv键值对类型。

shuffle阶段:

​ map输出的数据会经过分区、排序、分组等自带动作进行重组,相当于洗牌的逆过程。这是MapReduce的核心所在,也是难点所在。也是值得我们深入探究的所在。

​ 默认分区规则:key相同的分在同一个分区,同一个分区被同一个reduce处理。

​ 默认排序规则:根据key字典序排序

​ 默认分组规则:key相同的分为一组,一组调用reduce处理一次。

reduce阶段:

​ 负责针对shuffle好的数据进行聚合处理。输出的结果也应该是kv键值对。

Hadoop序列化机制

什么是序列化

序列化 (Serialization)是将结构化对象转换成字节流以便于进行网络传输或写入持久存储的过程。

反序列化(Deserialization)是将字节流转换为一系列结构化对象的过程,重新创建该对象。

序列化的用途:

1、作为一种持久化格式。

2、作为一种通信的数据格式。

3、作为一种数据拷贝、克隆机制。

image-20211117090506469

​ 简单概况:

  把对象转换为字节序列的过程称为对象的序列化
 把字节序列恢复为对象的过程称为对象的反序列化

Java的序列化机制

Java中,一切都是对象,在分布式环境中经常需要将Object从这一端网络或设备传递到另一端。这就需要有一种可以在两端传输数据的协议。Java序列化机制就是为了解决这个问题而产生。

Java对象序列化的机制,把对象表示成一个二进制的字节数组,里面包含了对象的数据,对象的类型信息,对象内部的数据的类型信息等等。通过保存或则转移这些二进制数组达到持久化、传递的目的。

要实现序列化,需要实现java.io.Serializable接口。反序列化是和序列化相反的过程,就是把二进制数组转化为对象的过程。

image-20211117090533725

Hadoop的序列化机制

Hadoop的序列化没有采用java的序列化机制,而是实现了自己的序列化机制。

原因在于java的序列化机制比较臃肿,重量级,是不断的创建对象的机制,并且会额外附带很多信息(校验、继承关系系统等)。但在Hadoop的序列化机制中,用户可以复用对象,这样就减少了java对象的分配和回收,提高了应用效率。

Hadoop通过Writable接口实现的序列化机制,不过没有提供比较功能,所以和java中的Comparable接口合并,提供一个接口WritableComparable。(自定义比较)。

Writable接口提供两个方法(writereadFields)。

package org.apache.hadoop.io;
public interface Writable {
  void write(DataOutput out) throws IOException;
  void readFields(DataInput in) throws IOException;
}

Hadoop序列化特点:高效、紧凑、扩展性强

Hadoop中数据类型

Hadoop提供了如下内容的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较。

Hadoop 数据类型 Java数据类型 备注
BooleanWritable boolean 标准布尔型数值
ByteWritable byte 单字节数值
IntWritable int 整型数
FloatWritable float 浮点数
LongWritable long 长整型数
DoubleWritable double 双字节数值
Text String 使用UTF8格式存储的文本
MapWritable map 映射
ArrayWritable array 数组
NullWritable null 中的key或value为空时使用

注意:如果需要将自定义的类放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。

MapReduce经典入门案例

WordCount业务需求

WordCount中文叫做单词统计、词频统计,指的是使用程序统计某文本文件中,每个单词出现的总次数。这个是大数据计算领域经典的入门案例,虽然业务及其简单,但是希望能够通过案例感受背后MapReduce的执行流程和默认的行为机制,这才是关键。

#输入数据 1.txt
hello hadoop hello hello
hadoop allen hadoop
.....

#输出结果
hello 3
hadoop 3
allen 1

MapReduce编程思路

image-20211117090737894

map阶段的核心:把输入的数据经过切割,全部标记1。因此输出就是<单词,1>。

shuffle阶段核心:经过默认的排序分区分组,key相同的单词会作为一组数据构成新的kv对。

reduce阶段核心:处理shuffle完的一组数据,该组数据就是该单词所有的键值对。对所有的1进行累加求和,就是该单词的总次数。最终输出<单词,总次数>。

WordCount编程实现

编程环境搭建

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast</groupId>
    <artifactId>test-mapreduce</artifactId>
    <version>1.0</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.32</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

Mapper类编写

public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
    //Mapper输出kv键值对  <单词,1>
    private Text keyOut = new Text();
    private final static LongWritable valueOut = new LongWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //将读取的一行内容根据分隔符进行切割
        String[] words = value.toString().split("\\s+");
        //遍历单词数组
        for (String word : words) {
            keyOut.set(word);
            //输出单词,并标记1
            context.write(new Text(word),valueOut);
        }
    }
}

Reducer类编写

public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> {

    private LongWritable result = new LongWritable();

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //统计变量
        long count = 0;
        //遍历一组数据,取出该组所有的value
        for (LongWritable value : values) {
            //所有的value累加 就是该单词的总次数
            count +=value.get();
        }
        result.set(count);
        //输出最终结果<单词,总次数>
        context.write(key,result);
    }
}

客户端驱动类编写

方式1:直接构建作业启动

public class WordCountDriver_v1 {
    public static void main(String[] args) throws Exception {
        //配置文件对象
        Configuration conf = new Configuration();
        // 创建作业实例
        Job job = Job.getInstance(conf, WordCountDriver_v1.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(WordCountDriver_v1.class);
        // 设置作业mapper reducer类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 设置作业mapper阶段输出key value数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        // 配置作业的输入数据路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 配置作业的输出数据路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //判断输出路径是否存在 如果存在删除
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(new Path(args[1]))){
            fs.delete(new Path(args[1]),true);
        }
        // 提交作业并等待执行完成
        boolean resultFlag = job.waitForCompletion(true);
        //程序退出
        System.exit(resultFlag ? 0 :1);
    }
}

方式2:Tool工具类创建启动

public class WordCountDriver_v2 extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        // 创建作业实例
        Job job = Job.getInstance(getConf(), WordCountDriver_v2.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(WordCountDriver_v2.class);

        // 设置作业mapper reducer类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 设置作业mapper阶段输出key value数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 配置作业的输入数据路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 配置作业的输出数据路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //判断输出路径是否存在 如果存在删除
        FileSystem fs = FileSystem.get(getConf());
        if(fs.exists(new Path(args[1]))){
            fs.delete(new Path(args[1]),true);
        }

        // 提交作业并等待执行完成
        return job.waitForCompletion(true) ? 0 : 1;

    }


    public static void main(String[] args) throws Exception {
        //配置文件对象
        Configuration conf = new Configuration();
        //使用工具类ToolRunner提交程序
        int status = ToolRunner.run(conf, new WordCountDriver_v2(), args);
        //退出客户端程序 客户端退出状态码和MapReduce程序执行结果绑定
        System.exit(status);
    }
}

MapReduce程序运行

所谓的运行模式讲的是:mr程序是单机运行还是分布式运行?mr程序需要的运算资源是yarn分配还是单机系统分配?

运行在何种模式 取决于下述这个参数:

mapreduce.framework.name=yarn 集群模式

mapreduce.framework.name=local 本地模式

如果不指定 默认是什么模式呢?

默认是local模式 在mapred-default.xml中有定义。如果代码中、运行的环境中有配置,会默认覆盖default配置。

本地模式运行

mapreduce程序是被提交给LocalJobRunner在本地以单进程的形式运行。而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上

本质是程序的conf中是否有mapreduce.framework.name=local

本地模式非常便于进行业务逻辑的debug。

右键直接运行main方法所在的主类即可。

集群模式运行

将mapreduce程序提交给yarn集群,分发到很多的节点上并发执行。处理的数据和输出结果应该位于hdfs文件系统

提交集群的实现步骤:

将程序打成jar包,然后在集群的任意一个节点上用命令启动

hadoop jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver args

yarn jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver args

MapReduce输入输出梳理

MapReduce框架运转在键值对上,也就是说,框架把作业的输入看成是一组键值对,同样也产生一组键值对作为作业的输出,这两组键值对可能是不同的。

image-20211117091241399

image-20211117091246116

输入特点

默认读取数据的组件叫做TextInputFormat。

关于输入路径:

  • 如果指向的是一个文件 处理该文件。
  • 如果指向的是一个文件夹(目录) 就处理该目录所有的文件 当成整体来处理。

输出特点

默认输出数据的组件叫做TextOutputFormat。

输出路径不能提前存在 否则执行报错 对输出路径进行检测判断。

MapReduce流程简单梳理

执行流程图

image-20211117091359517

Map阶段执行过程

  • 第一阶段是把输入目录下文件按照一定的标准逐个进行逻辑切片,形成切片规划。默认情况下,Split size = Block size。每一个切片由一个MapTask处理。(getSplits)
  • 第二阶段是对切片中的数据按照一定的规则解析成对。默认规则是把每一行文本内容解析成键值对。key是每一行的起始位置(单位是字节),value是本行的文本内容。(TextInputFormat)
  • 第三阶段是调用Mapper类中的map方法。上阶段中每解析出来的一个,调用一次map方法。每次调用map方法会输出零个或多个键值对。
  • 第四阶段是按照一定的规则对第三阶段输出的键值对进行分区。默认是只有一个区。分区的数量就是Reducer任务运行的数量。默认只有一个Reducer任务。
  • 第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接输出到文件中。
  • 第六阶段是对数据进行局部聚合处理,也就是combiner处理。键相等的键值对会调用一次reduce方法。经过这一阶段,数据量会减少。本阶段默认是没有的。

Redue阶段执行过程

  • 第一阶段是Reducer任务会主动从Mapper任务复制其输出的键值对。Mapper任务可能会有很多,因此Reducer会复制多个Mapper的输出。
  • 第二阶段是把复制到Reducer本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。
  • 第三阶段是对排序后的键值对调用reduce方法。键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS文件中。

在整个MapReduce程序的开发过程中,我们最大的工作量是覆盖map函数和覆盖reduce函数。