MapReduce性能优化概述

MapReduce的应用场景

Hadoop包含了GFS的开源实现HDFS(Hadoop distributed file system)和MapReduce框架的开源实现。Hadoop得到了企业界及学术界关注,Yahoo、Facebook、Cloudera、Twitter、Intel、华为等诸多公司和技术团体对Hadoop给予了大力支持。Cloudera对Apache Hadoop及相关组件的版本兼容性进行了整合、性能优化、功能测试,推出了其企业版的开源Hadoop。Intel推出了高效、安全及易于管理的Hadoop企业版。Hadoop由于其开源性质,已成为目前研究、优化云计算框架的重要样本和基础。其中MapReduce框架很适合处理文档分析、倒排索引建立等类型的应用,然而在列存储、索引建立、连接计算、迭代计算、科学计算及调度算法方面性能需要进一步优化。

优缺点与需求

优点

(1) Mapreduce易于编程

它简单的实现一些接口,就可以完成一个分布式程序,这个程序可以分布到大量的廉价的pc机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特性使的Mapreduce编程变得非常流行。

(2) 良好的扩展性

项目中当你的计算资源得不到满足的时候,你可以通过简单的通过增加机器来扩展它的计算能力

(3) 高容错性

Mapreduce的设计初衷就是使程序能够部署在廉价的pc机器上,这就要求它具有很高的容错性。比如一个机器挂了,它可以把上面的计算任务转移到另一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由hadoop内部完成的。

(4) 适合PB级以上海量数据的离线处理

缺点

img

MapReduce虽然有很多的优势,但是也有它不擅长的。这里的“不擅长”,不代表不能做,而是在有些场景下实现的效果差,并不适合用MapReduce来处理,主要表现在以下结果方面:

(1) 实时计算:MapReduce主要处理的数据来自于文件系统,所以无法像Oracle或MySQL那样在毫米或秒级内返回结果,如果需要大数据量的毫秒级响应,可以考虑结合实时存储系统来实现,利用HBase、Kudu等.

(2) 流计算:流计算的输入数据是动态的,而MapReduce主要的输入来自于HDFS等文件系统,数据是静态的,不能动态变化,这是因为MapReduce自身的设计特点决定了数据源必须是静态的。如果需要处理流式数据可以用Storm,Spark Steaming、Flink等流计算框架。

(3) DGA(有向无环图)计算:多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入磁盘,会造成大量的词频IO导致性能非常低下,此时可以考虑用Spark等迭代计算框架。

综合以上问题,MapReduce在解决离线分布式计算的过程中,主要考虑如何提升性能,加快分布式计算过程。

需求

基于整个MapReduce所存在的缺点,由于MapReduce整体结构已经固定,所以整体的优化方案只能从以下两点来考虑实现:

(1) 利用列存储思想,优化存储结构列存储在数据仓库、OLAP (on-line analytical processing)联机分析处理等应用上可以提高查询性能。利用列存储思想对MapReduce框架进行优化,面临合理的数据结构设计及数据压缩等挑战。

img

(2) 利用硬件资源优化连接算法,提高每个阶段的连接效率,MapReduce框架处理连接操作的过程比较复杂,面临数据倾斜、分布式环境数据传输及需要多个MapReduce作业等挑战,优化MapReduce每个阶段中的资源可以充分的利用硬件资源性能来提升MapReduce的效率。

img

IO性能优化:文件类型

优化方案

(1) 针对HDFS最初是为访问大文件而开发的, 所以会出现对大量小文件的存储效率不高问题, MapReduce在读取小文件进行处理时,也存在资源浪费导致计算效率不高的问题采用 SequenceFile和MapFile设计一个 HDFS中合并存储小文件的方案。该方案的主要思想是将小文件序列化存入一个 SequenceFIle/MapFile 容器,合并成大文件, 并建立相应的索引文件, 有效降低文件数目和提高访问效率. 通过和现有的 Hadoop Archives(HAR files)文件归档解决小文件问题的方案对比, 实验结果表明, 基于SequenceFile或者MapFile的存储小文件方案可以更为有效的提高小文件存储性能和减少HDFS文件系统的节点内存消耗

(2) 针对普通按行存储文本文件,MapReduce在处理实现聚合、过滤等功能时,性能相对较差,针对行式存储的数据处理性能差的问题,可以选择使用列式存储的方案来实现数据聚合处理,降低数据传输及读写的IO,提高整体MapReduce计算处理的性能

SequenceFile

介绍

SequenceFile是hadoop里用来存储序列化的键值对即二进制的一种文件格式。SequenceFile文件也可以作为MapReduce作业的输入和输出,hive和spark也支持这种格式。

它有如下几个优点:

  • 以二进制的KV形式存储数据,与底层交互更加友好,性能更快,所以可以在HDFS里存储图像或者更加复杂的结构作为KV对。
  • SequenceFile支持压缩和分片。当你压缩为一个SequenceFile时,并不是将整个文件压缩成一个单独的单元,而是压缩文件里的record或者block of records(块)。因此SequenceFile是能够支持分片的,即使使用的压缩方式如Snappy, Lz4 or Gzip不支持分片,也可以利用SequenceFIle来实现分片。
  • SequenceFile也可以用于存储多个小文件。由于Hadoop本身就是用来处理大型文件的,小文件是不适合的,所以用一个SequenceFile来存储很多小文件就可以提高处理效率,也能节省Namenode内存,因为Namenode只需一个SequenceFile的metadata,而不是为每个小文件创建单独的metadata。
  • 由于数据是以SequenceFile形式存储,所以中间输出文件即map输出也会用SequenceFile来存储,可以提高整体的IO开销性能。

存储特点

img

(1)sequenceFile文件是Hadoop用来存储二进制形式的[Key,Value]对而设计的一种平面文件(Flat File)。

(2) 可以把SequenceFile当做是一个容器,把所有的文件打包到SequenceFile类中可以高效的对小文件进行存储和处理。

(3) SequenceFile文件并不按照其存储的Key进行排序存储,SequenceFile的内部类Writer提供了append功能。

(4) SequenceFile中的Key和Value可以是任意类型Writable或者是自定义Writable。

(5) 存储结构上,SequenceFile主要由一个Header后跟多条Record组成,Header主要包含了Key classnamevalue classname,存储压缩算法,用户自定义元数据等信息,此外,还包含了一些同步标识,用于快速定位到记录的边界。每条Record以键值对的方式进行存储,用来表示它的字符数组可以一次解析成:记录的长度、Key的长度、Key值和value值,并且Value值的结构取决于该记录是否被压缩。

(6) 在recourds中,又分为是否压缩格式。当没有被压缩时,key与value使用Serialization序列化写入SequenceFile。当选择压缩格式时,record的压缩格式与没有压缩其实不尽相同,除了value的bytes被压缩,key是不被压缩的。

(7) 在Block中,它使所有的信息进行压缩,压缩的最小大小由配置文件中io.seqfile.compress.blocksize配置项决定。

SequenceFile工具类

  • SequenceFileOutputFormat

img

  • 用于将MapReduce的结果输出为SequenceFile文件

  • SequenceFileInputFormat

img

  • 用于读取SequenceFile文件

生成SequenceFile

  • 需求:将普通文件转换为SequenceFile文件

img

  • 思路

    • Step1:使用TextInputFormat读取普通文件文件
    • Step2:Map阶段对读取文件的每一行进行输出
    • Step3:Reduce阶段直接输出每条数据
    • Step4:使用SequenceFileOutputFormat将结果保存为SequenceFile
  • 代码实现

    • Driver类

      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.conf.Configured;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.*;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
      import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      import java.io.IOException;
      import java.util.Iterator;
      
      /**
       * @ClassName MrWriteToSequenceFile
       * @Description TODO 读取文本文件,转换为SequenceFile文件
       * @Create By     itcast
       */
      public class MrWriteToSequenceFile extends Configured implements Tool {
      
          //构建、配置、提交一个 MapReduce的Job
          public int run(String[] args) throws Exception {
              // 实例化作业
              Job job = Job.getInstance(this.getConf(), "MrWriteToSequenceFile");
              // 设置作业的主程序
              job.setJarByClass(this.getClass());
              // 设置作业的输入为TextInputFormat(普通文本)
              job.setInputFormatClass(TextInputFormat.class);
              // 设置作业的输入路径
              FileInputFormat.addInputPath(job, new Path(args[0]));
              // 设置Map端的实现类
              job.setMapperClass(WriteSeqFileAppMapper.class);
              // 设置Map端输入的Key类型
              job.setMapOutputKeyClass(NullWritable.class);
              // 设置Map端输入的Value类型
              job.setMapOutputValueClass(Text.class);
              // 设置作业的输出为SequenceFileOutputFormat
              job.setOutputFormatClass(SequenceFileOutputFormat.class);
              // 使用SequenceFile的块级别压缩
              SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
              // 设置Reduce端的实现类
              job.setReducerClass(WriteSeqFileAppReducer.class);
              // 设置Reduce端输出的Key类型
              job.setOutputKeyClass(NullWritable.class);
              // 设置Reduce端输出的Value类型
              job.setOutputValueClass(Text.class);
              // 从参数中获取输出路径
              Path outputDir = new Path(args[1]);
              // 如果输出路径已存在则删除
              outputDir.getFileSystem(this.getConf()
      ).delete(outputDir, true);
              // 设置作业的输出路径
              FileOutputFormat.setOutputPath(job, outputDir);
              // 提交作业并等待执行完成
              return job.waitForCompletion(true) ? 0 : 1;
          }
      
          //程序入口,调用run
          public static void main(String[] args) throws Exception {
              //用于管理当前程序的所有配置
              Configuration conf = new Configuration();
              int status = ToolRunner.run(conf, new MrWriteToSequenceFile(), args);
              System.exit(status);
          }
      }
    • Mapper类

      /**
       * 定义Mapper类
       */
      public static class WriteSeqFileAppMapper extends Mapper<LongWritable, Text,NullWritable, Text>{
      
      
          private NullWritable outputKey;
          @Override
          protected void setup(Context context) throws IOException, InterruptedException {
              this.outputKey = NullWritable.get();
          }
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              context.write(outputKey, value);
          }
      
          @Override
          protected void cleanup(Context context) throws IOException, InterruptedException {
              this.outputKey = null;
          }}
    • Reduce类

      /**
       * 定义Reduce类
       */
      public static class WriteSeqFileAppReducer extends Reducer<NullWritable,Text,NullWritable,Text>{
      
          private NullWritable outputKey;
      
          @Override
          protected void setup(Context context) throws IOException, InterruptedException {
              this.outputKey = NullWritable.get();
          }
      
          @Override
          protected void reduce(NullWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
              Iterator<Text> iterator = value.iterator();
              while (iterator.hasNext()) {
                  context.write(outputKey, iterator.next());
              }
          }
          @Override
          protected void cleanup(Context context) throws IOException, InterruptedException {
              this.outputKey = null;
          }
      
      }
  • 查看结果

    • SequenceFile为二进制文件,不可以直接查看结果,会显示为乱码,可以通过MapReduce读取解析为普通文本

img

读取SequenceFile

  • 需求:将上一步转换好的SequenceFile再解析转换为普通文本文件内容

  • 思路

    • Step1:使用SequenceFileInputformat读取SequenceFile

    • Step2:Map阶段直接输出每一条数据

    • Step3:Reduce阶段直接输出每一条数据

    • Step4:使用TextOutputFormat将结果保存为普通文本文件

  • 代码实现

    • Driver类

      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.conf.Configured;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      
      import java.io.IOException;
      import java.util.Iterator;
      
      /**
       * @ClassName MrReadFromSequenceFile
       * @Description TODO 读取SequenceFile文件,转换为普通文本文件
       * @Create By     itcast
       */
      public class MrReadFromSequenceFile extends Configured implements Tool {
      
          //构建、配置、提交一个 MapReduce的Job
          public int run(String[] args) throws Exception {
              // 实例化作业
              Job job = Job.getInstance(this.getConf(), "MrReadFromSequenceFile");
              // 设置作业的主程序
              job.setJarByClass(this.getClass());
              // 设置作业的输入为SequenceFileInputFormat(SequenceFile文本)
              job.setInputFormatClass(SequenceFileInputFormat.class);
              // 设置作业的输入路径
              SequenceFileInputFormat.addInputPath(job, new Path(args[0]));
              // 设置Map端的实现类
              job.setMapperClass(ReadSeqFileAppMapper.class);
              // 设置Map端输入的Key类型
              job.setMapOutputKeyClass(NullWritable.class);
              // 设置Map端输入的Value类型
              job.setMapOutputValueClass(Text.class);
              // 设置作业的输出为TextOutputFormat
              job.setOutputFormatClass(TextOutputFormat.class);
              // 设置Reduce端的实现类
              job.setReducerClass(ReadSeqFileAppReducer.class);
              // 设置Reduce端输出的Key类型
              job.setOutputKeyClass(NullWritable.class);
              // 设置Reduce端输出的Value类型
              job.setOutputValueClass(Text.class);
              // 从参数中获取输出路径
              Path outputDir = new Path(args[1]);
              // 如果输出路径已存在则删除
              outputDir.getFileSystem(this.getConf()).delete(outputDir, true);
              // 设置作业的输出路径
              TextOutputFormat.setOutputPath(job, outputDir);
              // 提交作业并等待执行完成
              return job.waitForCompletion(true) ? 0 : 1;
          }
      
          //程序入口,调用run
          public static void main(String[] args) throws Exception {
              //用于管理当前程序的所有配置
              Configuration conf = new Configuration();
              int status = ToolRunner.run(conf, new MrReadFromSequenceFile(), args);
              System.exit(status);
          }
      }
    • Mapper类

      /**
       * 定义Mapper类
       */
      public static class ReadSeqFileAppMapper extends Mapper<NullWritable, Text, NullWritable, Text> {
      
      
          private NullWritable outputKey;
      
          @Override
          protected void setup(Context context) throws IOException, InterruptedException {
              this.outputKey = NullWritable.get();
          }
      
          @Override
          protected void map(NullWritable key, Text value, Context context) throws IOException, InterruptedException {
              context.write(outputKey, value);
          }
      
          @Override
          protected void cleanup(Context context) throws IOException, InterruptedException {
              this.outputKey = null;
          }
      
      }
    • Reducer类

      /**
       * 定义Reduce类
       */
      public static class ReadSeqFileAppReducer extends Reducer<NullWritable,Text,NullWritable,Text>{
      
          private NullWritable outputKey;
      
          @Override
          protected void setup(Context context) throws IOException, InterruptedException {
              this.outputKey = NullWritable.get();
          }
      
          @Override
          protected void reduce(NullWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
              Iterator<Text> iterator = value.iterator();
              while (iterator.hasNext()) {
                  context.write(outputKey, iterator.next());
              }
          }
          @Override
          protected void cleanup(Context context) throws IOException, InterruptedException {
              this.outputKey = null;
          }
      
      }
    • 查看结果

      • 数据被还原为普通文本文件

img

MapFile

介绍

可以理解MapFile是排序后的SequenceFile,通过观察其结构可以看到MapFile由两部分组成分别是data和index。data既存储数据的文件,index作为文件的数据索引,主要记录了每个Record的Key值,以及该Record在文件中的偏移位置。在MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可以迅速定位到指定Record所在文件位置,因此,相对SequenceFile而言,MapFile的检索效率是最高的,缺点是会消耗一部分内存来存储index数据。

MapFile的数据存储结构如下:

img

需要注意的是,MapFile并不不会把所有的Record都记录到index中去,默认情况下每隔128条记录会存储一个索引映射。当然,记录间隔可认为修改,通过MapFile.Writer的setIndexInterval()方法,或修改io.map.index.interval属性。

并且与SequenceFile不同的是,MapFile的KeyClass一定要实现WritableComparable接口,即Key值是可比较的,最终实现基于Key的有序。

为了验证MapFile的效果,经过对小文件的对比测试,可以看出本文的改进小文件存储策略在文件上传时的效率与未经改进没什么差别, 但是在经过基于 MapFile 序列化的将小文件合并成大文件后, 在文件读取方面, 比未经改进和经 HAR 合并的环境下效率都高, 而且在 HDFS 空闲时, 合并过后的内存占用率明显下降, 这就减轻了 Namenode 名称节点的负担, 提高内存使用率。

img

img

MapFile工具类

l MapFileOutputFormat:用于将MapReduce的结果输出为MapFile

img

  • MapReduce中没有封装MapFile的读取输入类,工作中可根据情况选择以下方案来实现
    • 方案一:自定义InputFormat,使用MapFileOutputFormat中的getReader方法获取读取对象
    • 方案二:使用SequenceFileInputFormat对MapFile的数据进行解析

生成MapFile文件

  • 需求:将普通文件转换为MapFile文件

  • 思路

    • Step1:Input读取一个普通文件
    • Step2:Map阶段构建随机值作为Key,构建有序
    • Step3:Output生成MapFile文件
  • 实现

    • 开发代码

      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.conf.Configured;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.*;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
      import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      
      import java.io.IOException;
      import java.util.Iterator;
      import java.util.Random;
      
      /**
       * @ClassName MrWriteToMapFile
       * @Description TODO 读取文本文件,转换为MapFile文件
       * @Create By     itcast
       */
      public class MrWriteToMapFile extends Configured implements Tool {
      
          //构建、配置、提交一个 MapReduce的Job
          public int run(String[] args) throws Exception {
              Configuration conf = getConf();
              // 实例化作业
              Job job = Job.getInstance(conf, "MrWriteToMapFile");
              // 设置作业的主程序
              job.setJarByClass(this.getClass());
              // 设置作业的输入为TextInputFormat(普通文本)
              job.setInputFormatClass(TextInputFormat.class);
              // 设置作业的输入路径
              FileInputFormat.addInputPath(job, new Path(args[0]));
              // 设置Map端的实现类
              job.setMapperClass(WriteMapFileAppMapper.class);
              // 设置Map端输入的Key类型
              job.setMapOutputKeyClass(IntWritable.class);
              // 设置Map端输入的Value类型
              job.setMapOutputValueClass(Text.class);
              // 设置作业的输出为MapFileOutputFormat
              job.setOutputFormatClass(MapFileOutputFormat.class);
              // 设置Reduce端的实现类
              job.setReducerClass(WriteMapFileAppReducer.class);
              // 设置Reduce端输出的Key类型
              job.setOutputKeyClass(IntWritable.class);
              // 设置Reduce端输出的Value类型
              job.setOutputValueClass(Text.class);
              // 从参数中获取输出路径
              Path outputDir = new Path(args[1]);
              // 如果输出路径已存在则删除
              outputDir.getFileSystem(conf).delete(outputDir, true);
              // 设置作业的输出路径
              MapFileOutputFormat.setOutputPath(job, outputDir);
              // 提交作业并等待执行完成
              return job.waitForCompletion(true) ? 0 : 1;
          }
      
          //程序入口,调用run
          public static void main(String[] args) throws Exception {
              //用于管理当前程序的所有配置
              Configuration conf = new Configuration();
              int status = ToolRunner.run(conf, new MrWriteToMapFile(), args);
              System.exit(status);
          }
      
      
          /**
           * 定义Mapper类
           */
          public static class WriteMapFileAppMapper extends Mapper<LongWritable, Text, IntWritable, Text>{
              //定义输出的Key,每次随机生成一个值
              private IntWritable outputKey = new IntWritable();
              @Override
              protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                  //随机生成一个数值
                  Random random = new Random();
                  this.outputKey.set(random.nextInt(100000));
                  context.write(outputKey, value);
              }
               }
      
          /**
           * 定义Reduce类
           */
          public static class WriteMapFileAppReducer extends Reducer<IntWritable,Text,IntWritable,Text>{
      
              @Override
              protected void reduce(IntWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
                  Iterator<Text> iterator = value.iterator();
                  while (iterator.hasNext()) {
                      context.write(key, iterator.next());
                  }
              }
      
          }
      
      }
  • 打成jar包,提交运行

yarn jar mapfile.jar bigdata.itcast.cn.mr.test.MrWriteToMapFile /datas/input/mapfile/ /datas/output/mapfile1

  • 查看结果

img

img

img

读取MapFile文件

  • 需求:将MapFile解析为普通文件内容

  • 思路

    • Step1:Input读取MapFile,注意,Hadoop没有提供MapFileInputFormat,所以使用SequenceFileInputFormat来解析,或者可以自定义InputFormat
    • Step2:Map和Reduce直接输出
    • Step3:Output将结果保存为普通文件
  • 实现

    • 开发代码

      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.conf.Configured;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      
      import java.io.IOException;
      import java.util.Iterator;
      
      /**
       * @ClassName MrReadFromMapFile
       * @Description TODO 读取MapFile文件,转换为普通文本文件
       * @Create By     itcast
       */
      public class MrReadFromMapFile extends Configured implements Tool {
      
          //构建、配置、提交一个 MapReduce的Job
          public int run(String[] args) throws Exception {
              // 实例化作业
              Job job = Job.getInstance(this.getConf(), "MrReadFromMapFile");
              // 设置作业的主程序
              job.setJarByClass(this.getClass());
              // 设置作业的输入为SequenceFileInputFormat(Hadoop没有直接提供MapFileInput)
              job.setInputFormatClass(SequenceFileInputFormat.class);
              // 设置作业的输入路径
              SequenceFileInputFormat.addInputPath(job, new Path(args[0]));
              // 设置Map端的实现类
              job.setMapperClass(ReadMapFileAppMapper.class);
              // 设置Map端输入的Key类型
              job.setMapOutputKeyClass(NullWritable.class);
              // 设置Map端输入的Value类型
              job.setMapOutputValueClass(Text.class);
              // 设置作业的输出为SequenceFileOutputFormat
              job.setOutputFormatClass(TextOutputFormat.class);
              // 设置Reduce端的实现类
              job.setReducerClass(ReadMapFileAppReducer.class);
              // 设置Reduce端输出的Key类型
              job.setOutputKeyClass(NullWritable.class);
              // 设置Reduce端输出的Value类型
              job.setOutputValueClass(Text.class);
              // 从参数中获取输出路径
              Path outputDir = new Path(args[1]);
              // 如果输出路径已存在则删除
              outputDir.getFileSystem(this.getConf()).delete(outputDir, true);
              // 设置作业的输出路径
              TextOutputFormat.setOutputPath(job, outputDir);
              // 提交作业并等待执行完成
              return job.waitForCompletion(true) ? 0 : 1;
          }
      
          //程序入口,调用run
          public static void main(String[] args) throws Exception {
              //用于管理当前程序的所有配置
              Configuration conf = new Configuration();
              int status = ToolRunner.run(conf, new MrReadFromMapFile(), args);
              System.exit(status);
          }
      
      
          /**
           * 定义Mapper类
           */
          public static class ReadMapFileAppMapper extends Mapper<IntWritable, Text, NullWritable, Text> {
      
      
              private NullWritable outputKey = NullWritable.get();
      
              @Override
              protected void map(IntWritable key, Text value, Context context) throws IOException, InterruptedException {
                  context.write(outputKey, value);
              }
      
          }
      
          /**
           * 定义Reduce类
           */
          public static class ReadMapFileAppReducer extends Reducer<NullWritable,Text,NullWritable,Text>{
      
              @Override
              protected void reduce(NullWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
                  Iterator<Text> iterator = value.iterator();
                  while (iterator.hasNext()) {
                      context.write(key, iterator.next());
                  }
              }
          }
      
      }
    • 打成jar包,提交运行

      yarn jar mapfile.jar bigdata.itcast.cn.mr.test.MrReadFromMapFile /datas/output/mapfile1 /datas/output/mapfile2
    • 查看结果

img

ORCFile

列式存储

行存储和列存储,是数据库底层组织数据的方式。我们平常生活中或者工作中接触了很多数据的存储系统,但是大部分都是行存储系统。比如我们学习的数据库管理系统,我们将数据库中的表想象成一张表格,每条数据记录就是一行数据,每行数据包含若干列。所以我们对大部分数据存储的思维也就是一个复杂一点的表格管理系统。我们在一行一行地写入数据,然后按查询条件查询过滤出我们想要的行记录。大部分传统的RDBMS(关系型数据库),都是面向行来组织数据的,比如MySQL,Oracle、PostgreSQL。

传统 OLTP(Online Transaction Processing)数据库通常采用行式存储。以下图为例,所有的列依次排列构成一行,以行为单位存储,再配合以 B+ 树或 SS-Table 作为索引,就能快速通过主键找到相应的行数据。

img

img

行存储将会以上方式将数据存储在磁盘上。它利于数据一行一行的写入,写入一条数据记录时,只需要将数据追加到已有数据记录后面即可。

行模式存储适合 OLTP(Online Transaction Processing)系统。因为数据基于行存储,所以数据的写入会更快,对按行查询数据也更简单

我们常见的数据存储都是行式存储,那为什么我们要学习列式存储呢?

因为我们现在学习的数据处理,大数据,数据分析,也就是 OLAP(Online Analytical Processing)在线分析系统的需求增多了,数据写入的事务和按记录查询数据都不是它的关注点,它关注的是数据过滤,统计聚合,例如统计数据中的行数、平均值、最大值、最小值等。

列式存储(Column-oriented Storage)并不是一项新技术,最早可以追溯到 1983 年的论文 Cantor。然而,受限于早期的硬件条件和使用场景,主流的事务型数据库(OLTP)大多采用行式存储,直到近几年分析型数据库(OLAP)的兴起,列式存储这一概念又变得流行。

对于 OLAP 场景,一个典型的查询需要遍历整个表,进行分组、排序、聚合等操作,这样一来按行存储的优势就不复存在了。分析型 SQL 常常不会用到所有的列,而仅仅对其中某些感兴趣的列做运算,那一行中那些无关的列也不得不参与扫描。而使用了列式存储,可以只扫描我们需要的列,不需要将无关的列进行扫描,减少不必要的IO及磁盘检索消耗,提升读的性能。

列式存储就是为这样的需求设计的。如下图所示,同一列的数据被一个接一个紧挨着存放在一起,表的每列构成一个长数组。

img

img

列式存储的优点:

  • 自动索引

因为基于列存储,所以每一列本身就相当于索引。所以在做一些需要索引的操作时,就不需要额外的数据结构来为此列创建合适的索引。

  • 利于数据压缩

相同的列数据类型一致,这样利于数据结构填充的优化和压缩,而且对于数字列这种数据类型可以采取更多有利的算法去压缩存储。

总的来说,列式存储的优势一方面体现在存储上能节约空间、减少 IO,另一方面依靠列式数据结构做了计算上的优化。下面是行式存储与列式存储对比:

行式存储 行式存储 列式存储
特点 会扫描不需要的数据列 只读取需要的数据列
场景 适合于按记录读写数据的场景,不适合聚合统计的场景 适合于数据过滤、聚合统计的场景,不适合按记录一个一个读写场景
应用 OLTP OLAP
压缩 不利于压缩数据 适合压缩数据

ORC介绍

ORC(OptimizedRC File)文件格式是一种Hadoop生态圈中的列式存储格式,源自于RC(RecordColumnar File),它的产生早在2013年初,最初产生自Apache Hive,用于降低Hadoop数据存储空间和加速Hive查询速度。它并不是一个单纯的列式存储格式,仍然是首先根据行组分割整个表,在每一个行组内进行按列存储。ORC文件是自描述的,它的元数据使用Protocol Buffers序列化,并且文件中的数据尽可能的压缩以降低存储空间的消耗,目前也被Spark SQL、Presto等查询引擎支持。2015年ORC项目被Apache项目基金会提升为Apache顶级项目。

ORC文件基本存储结构:

img

  • ORC官方明细结构图:

img

ORC文件也是以二进制方式存储的,所以是不可以直接读取,ORC文件也是自解析的,它包含许多的元数据,这些元数据都是同构ProtoBuffer进行序列化的。其中涉及到如下的概念:

ORC文件:保存在文件系统上的普通二进制文件,一个ORC文件中可以包含多个stripe,每一个stripe包含多条记录,这些记录按照列进行独立存储,对应到Parquet中的row group的概念。

文件级元数据:包括文件的描述信息PostScript、文件meta信息(包括整个文件的统计信息)、所有stripe的信息和文件schema信息。

stripe:一组行形成一个stripe,每次读取文件是以行组为单位的,一般为HDFS的块大小,保存了每一列的索引和数据。

stripe元数据:保存stripe的位置、每一个列的在该stripe的统计信息以及所有的stream类型和位置。

row group:索引的最小单位,一个stripe中包含多个row group,默认为10000个值组成。

stream:一个stream表示文件中一段有效的数据,包括索引和数据两类。索引stream保存每一个row group的位置和统计信息,数据stream包括多种类型的数据,具体需要哪几种是由该列类型和编码方式决定。

ORC文件中保存了三个层级的统计信息,分别为文件级别、stripe级别和row group级别的,他们都可以用来根据Search ARGuments(谓词下推条件)判断是否可以跳过某些数据,在统计信息中都包含成员数和是否有null值,并且对于不同类型的数据设置一些特定的统计信息。

  • 性能测试:
    • 原始Text格式,未压缩 : 38.1 G
    • ORC格式,默认压缩(ZLIB): 11.5 G
    • Parquet格式,默认压缩(Snappy):14.8 G
    • 测试对比:复杂数据Join关联测试

img

ORCFile工具类

  • 添加ORC与MapReduce集成的Maven依赖
<dependency>
    <groupId>org.apache.orc</groupId>
    <artifactId>orc-mapreduce</artifactId>
    <version>1.6.3</version>
</dependency>
  • OrcOutputFormat:用于将结果写入ORC文件

img

  • OrcInputFormat:用于实现读取ORC文件类型

img

生成ORC文件

  • 需求:将普通文件转换为ORC文件

img

  • 思路

    • Step1:Input阶段读取普通文件
    • Step2:Map阶段直接输出数据,没有Reduce阶段
    • Step3:Output阶段使用OrcOutputFormat保存为ORC文件类型
  • 实现

    • 开发代码

      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.conf.Configured;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      import org.apache.orc.OrcConf;
      import org.apache.orc.TypeDescription;
      import org.apache.orc.mapred.OrcStruct;
      import org.apache.orc.mapreduce.OrcOutputFormat;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.io.IOException;
      
      /**
       * @ClassName WriteOrcFileApp
       * @Description TODO 用于读取普通文本文件转换为ORC文件
       */
      public class WriteOrcFileApp extends Configured implements Tool {
          // 作业名称
          private static final String JOB_NAME = WriteOrcFileApp.class.getSimpleName();
          // 构建日志监听
          private static final Logger LOG = LoggerFactory.getLogger(WriteOrcFileApp.class);
          // 定义数据的字段信息
          private static final String SCHEMA = "struct<id:string,type:string,orderID:string,bankCard:string,cardType:string,ctime:string,utime:string,remark:string>";
      
      
          /**
           * 重写Tool接口的run方法,用于提交作业
           * @param args
           * @return
           * @throws Exception
           */
          public int run(String[] args) throws Exception {
              // 设置Schema
              OrcConf.MAPRED_OUTPUT_SCHEMA.setString(this.getConf(), SCHEMA);
              // 实例化作业
              Job job = Job.getInstance(this.getConf(), JOB_NAME);
              // 设置作业的主程序
              job.setJarByClass(WriteOrcFileApp.class);
              // 设置作业的Mapper类
              job.setMapperClass(WriteOrcFileAppMapper.class);
              // 设置作业的输入为TextInputFormat(普通文本)
              job.setInputFormatClass(TextInputFormat.class);
              // 设置作业的输出为OrcOutputFormat
              job.setOutputFormatClass(OrcOutputFormat.class);
              // 设置作业使用0个Reduce(直接从map端输出)
              job.setNumReduceTasks(0);
              // 设置作业的输入路径
              FileInputFormat.addInputPath(job, new Path(args[0]));
              // 从参数中获取输出路径
              Path outputDir = new Path(args[1]);
              // 如果输出路径已存在则删除
              outputDir.getFileSystem(this.getConf()).delete(outputDir, true);
              // 设置作业的输出路径
              OrcOutputFormat.setOutputPath(job, outputDir);
              // 提交作业并等待执行完成
              return job.waitForCompletion(true) ? 0 : 1;
          }
      
          //程序入口,调用run
          public static void main(String[] args) throws Exception {
              //用于管理当前程序的所有配置
              Configuration conf = new Configuration();
              int status = ToolRunner.run(conf, new WriteOrcFileApp(), args);
              System.exit(status);
          }
      
          /**
           * 实现Mapper类
           */
          public static class WriteOrcFileAppMapper extends Mapper<LongWritable, Text, NullWritable, OrcStruct> {
              // 获取字段描述信息
              private TypeDescription schema = TypeDescription.fromString(SCHEMA);
              // 构建输出的Key
              private final NullWritable outputKey = NullWritable.get();
              // 构建输出的Value为ORCStruct类型
              private final OrcStruct outputValue = (OrcStruct) OrcStruct.createValue(schema);
              public void map(LongWritable key, Text value, Context output) throws IOException, InterruptedException {
                  // 将读取到的每一行数据进行分割,得到所有字段
                  String[] fields = value.toString().split(",",8);
                  // 将所有字段赋值给Value中的列
                  outputValue.setFieldValue(0, new Text(fields[0]));
                  outputValue.setFieldValue(1, new Text(fields[1]));
                  outputValue.setFieldValue(2, new Text(fields[2]));
                  outputValue.setFieldValue(3, new Text(fields[3]));
                  outputValue.setFieldValue(4, new Text(fields[4]));
                  outputValue.setFieldValue(5, new Text(fields[5]));
                  outputValue.setFieldValue(6, new Text(fields[6]));
                  outputValue.setFieldValue(7, new Text(fields[7]));
                  // 输出KeyValue
                  output.write(outputKey, outputValue);
              }
          }
      }
  • 打成jar包

img

  • 提交yarn运行
yarn jar orcTest.jar bigdata.itcast.cn.mr.test.WriteOrcFileApp /datas/input/orc /datas/output/orc
  • 报错:缺少orc-mapreduce的jar包依赖

img

  • 解决:将orc-MapReduce的jar包添加到Hadoop的环境变量中,所有NodeManager节点都要添加
cp  orc-shims-1.6.3.jar  orc-core-1.6.3.jar  orc-mapreduce-1.6.3.jar  aircompressor-0.15.jar  hive-storage-api-2.7.1.jar /export/server/hadoop-3.1.4/share/hadoop/mapreduce/

img

  • 重新提交运行,运行成功,查看结果

img

img

读取ORC文件

  • 需求:读取ORC文件,还原成普通文本文件

  • 思路

    • Step1:Input阶段读取上一步当中生成的ORC文件
    • Step2:Map阶段直接读取输出
    • Step3:Output阶段将结果保存为普通文本文件
  • 实现

    • 开发代码

      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.conf.Configured;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      import org.apache.orc.mapred.OrcStruct;
      import org.apache.orc.mapreduce.OrcInputFormat;
      
      import java.io.IOException;
      
      /**
       * @ClassName ReadOrcFileApp
       * @Description TODO 读取ORC文件进行解析还原成普通文本文件
       */
      public class ReadOrcFileApp extends Configured implements Tool {
          // 作业名称
          private static final String JOB_NAME = WriteOrcFileApp.class.getSimpleName();
      
          /**
           * 重写Tool接口的run方法,用于提交作业
           * @param args
           * @return
           * @throws Exception
           */
          public int run(String[] args) throws Exception {
              // 实例化作业
              Job job = Job.getInstance(this.getConf(), JOB_NAME);
              // 设置作业的主程序
              job.setJarByClass(ReadOrcFileApp.class);
              // 设置作业的输入为OrcInputFormat
              job.setInputFormatClass(OrcInputFormat.class);
              // 设置作业的输入路径
              OrcInputFormat.addInputPath(job, new Path(args[0]));
              // 设置作业的Mapper类
              job.setMapperClass(ReadOrcFileAppMapper.class);
              // 设置作业使用0个Reduce(直接从map端输出)
              job.setNumReduceTasks(0);
              // 设置作业的输入为TextOutputFormat
              job.setOutputFormatClass(TextOutputFormat.class);
              // 从参数中获取输出路径
              Path outputDir = new Path(args[1]);
              // 如果输出路径已存在则删除
              outputDir.getFileSystem(this.getConf()).delete(outputDir, true);
              // 设置作业的输出路径
              FileOutputFormat.setOutputPath(job, outputDir);
              // 提交作业并等待执行完成
              return job.waitForCompletion(true) ? 0 : 1;
          }
      
      
          // 程序入口,调用run
          public static void main(String[] args) throws Exception {
              // 用于管理当前程序的所有配置
              Configuration conf = new Configuration();
              int status = ToolRunner.run(conf, new ReadOrcFileApp(), args);
              System.exit(status);
          }
      
          /**
           * 实现Mapper类
           */
          public static class ReadOrcFileAppMapper extends Mapper<NullWritable, OrcStruct, NullWritable, Text> {
              private NullWritable outputKey;
              private Text outputValue;
              @Override
              protected void setup(Context context) throws IOException, InterruptedException {
                  outputKey = NullWritable.get();
                  outputValue = new Text();
              }
              public void map(NullWritable key, OrcStruct value, Context output) throws IOException, InterruptedException {
                  //将ORC中的每条数据转换为Text对象
                  this.outputValue.set(
                          value.getFieldValue(0).toString()+","+
                                  value.getFieldValue(1).toString()+","+
                                  value.getFieldValue(2).toString()+","+
                                  value.getFieldValue(3).toString()+","+
                                  value.getFieldValue(4).toString()+","+
                                  value.getFieldValue(5).toString()+","+
                                  value.getFieldValue(6).toString()+","+
                                  value.getFieldValue(7).toString()
                  );
                  //输出结果
                  output.write(outputKey, outputValue);
              }
          }
      
      }
  • 打成jar包,提交运行

yarn jar orcTest.jar bigdata.itcast.cn.mr.test.ReadOrcFileApp /datas/output/orc /datas/output/orc-text
  • 查看结果

img

img

数据压缩优化

压缩优化设计

运行MapReduce程序时,磁盘I/O操作、网络数据传输、shuffle和merge要花大量的时间,尤其是数据规模很大和工作负载密集的情况下,鉴于磁盘I/O和网络带宽是Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O和网络传输非常有帮助。如果磁盘I/O和网络带宽影响了MapReduce作业性能,在任意MapReduce阶段启用压缩都可以改善端到端处理时间并减少I/O和网络流量。

img

压缩是mapreduce的一种优化策略:通过压缩编码对mapper或者reducer的输出进行压缩,以减少磁盘IO,提高MR程序运行速度,它的优缺点如下:

  • 压缩的优点

    • 减小文件存储所占空间
    • 加快文件传输效率,从而提高系统的处理速度
    • 降低IO读写的次数
  • 压缩的缺点

    • 使用数据时需要先对文件解压,加重CPU负荷,压缩算法越复杂,解压时间越长

压缩支持

  • 检查Hadoop支持的压缩算法:hadoop checknative

img

  • Hadoop支持的压缩算法

img

  • 各压缩算法压缩性能对比
压缩算法 优点 缺点
Gzip 压缩比在四种压缩方式中较高;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;有hadoop native库;大部分linux系统都自带gzip命令,使用方便 不支持split
Lzo 压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;支持hadoop native库;需要在linux系统下自行安装lzop命令,使用方便 压缩率比gzip要低;hadoop本身不支持,需要安装;lzo虽然支持split,但需要对lzo文件建索引,否则hadoop也是会把lzo文件看成一个普通文件(为了支持split需要建索引,需要指定inputformat为lzo格式)
Bzip2 支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native;在linux系统下自带bzip2命令,使用方便 压缩/解压速度慢;不支持native
Snappy 压缩速度快;支持hadoop native库 不支持split;压缩比低;hadoop本身不支持,需要安装;linux系统下没有对应的命令
  • 同样大小的数据对应压缩比

img

  • 压缩时间和解压时间

img

  • 从以上对比可以看出:压缩比越高,压缩时间越长,应当选择压缩比与压缩时间中等的压缩算法
  • Hadoop中的压缩位置及配置

img

  • Input压缩

Hadoop会自动检查压缩文件的扩展名,使用对应的解码器进行解码无需单独指定。

  • Map输出压缩

配置Map输出的结果进行压缩,需要指定以下属性来开启压缩机配置压缩算法类型

mapreduce.map.output.compress true
mapreduce.map.output.compress.codec org.apache.hadoop.io.compress.DefaultCodec
  • Reduce输出压缩

配置Reduce输出的结果进行压缩,需要指定以下属性来开启压缩机配置压缩算法类型

mapreduce.output.fileoutputformat.compress true
mapreduce.output.fileoutputformat.compress.codec org.apache.hadoop.io.compress.DefaultCodec
mapreduce.output.fileoutputformat.compress.type RECORD

Gzip压缩

生成Gzip压缩文件

(1) 需求:读取普通文本文件,将普通文本文件压缩为Gzip格式

(2) 思路

a) Step1:Input读取普通文本文件

b) Step2:Map和Reduce直接输出

c) Step3:配置Output输

d) 出压缩为Gzip格式

(3) 代码开发

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * @ClassName MRWriteGzip
 * @Description TODO 读取普通文件数据,对数据以Gzip格式进行压缩
 */
public class MRWriteGzip extends Configured implements Tool {

    // 构建、配置、提交一个 MapReduce的Job
    public int run(String[] args) throws Exception {

        // 构建Job
        Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
        job.setJarByClass(MRWriteGzip.class);

        // input:配置输入
        Path inputPath = new Path(args[0]);
        TextInputFormat.setInputPaths(job,inputPath);

        // map:配置Map
        job.setMapperClass(MrMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        // reduce:配置Reduce
        job.setReducerClass(MrReduce.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        // output:配置输出
        Path outputPath = new Path(args[1]);
        TextOutputFormat.setOutputPath(job,outputPath);

        return job.waitForCompletion(true) ? 0 : -1;
    }

    // 程序入口,调用run
    public static void main(String[] args) throws Exception {
        // 用于管理当前程序的所有配置
        Configuration conf = new Configuration();
        // 配置输出结果压缩为Gzip格式
        conf.set("mapreduce.output.fileoutputformat.compress","true");
        conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.GzipCodec");
        // 调用run方法,提交运行Job
        int status = ToolRunner.run(conf, new MRWriteGzip(), args);
        System.exit(status);
    }


    /**
     * 定义Mapper类
     */
    public static class MrMapper extends Mapper<LongWritable, Text, NullWritable, Text>{

        private NullWritable outputKey = NullWritable.get();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 直接输出每条数据
            context.write(this.outputKey,value);
        }
    }

    /**
     * 定义Reduce类
     */
    public static class MrReduce extends Reducer<NullWritable,Text,NullWritable, Text> {

        @Override
        protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            // 直接输出每条数据
            for (Text value : values) {
                context.write(key, value);
            }
        }
    }

}

(4) 提交运行

yarn jar compress.jar bigdata.itcast.cn.mr.test.MRWriteGzip /datas/input/compress /datas/output/compress/gzip1

(5) 查看结果

img

img

读取Gzip压缩文件

(1) 需求:读取Gzip压缩文件,还原为普通文本文件

(2) 思路

a) Step1:Input直接读取上一步的压缩结果文件

b) Step2:Map和Reduce直接输出

c) Step3:Output将结果保存为普通文本文件

(3) 代码开发

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * @ClassName MRReadGzip
 * @Description TODO 读取Gzip格式的数据,还原为普通文本文件
 */
public class MRReadGzip extends Configured implements Tool {

    //构建、配置、提交一个 MapReduce的Job
    public int run(String[] args) throws Exception {

        // 构建Job
        Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
        job.setJarByClass(MRReadGzip.class);

        // input:配置输入
        Path inputPath = new Path(args[0]);
        TextInputFormat.setInputPaths(job,inputPath);

        // map:配置Map
        job.setMapperClass(MrMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        // reduce:配置Reduce
        job.setReducerClass(MrReduce.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);


        // output:配置输出
        Path outputPath = new Path(args[1]);
        TextOutputFormat.setOutputPath(job,outputPath);

        return job.waitForCompletion(true) ? 0 : -1;
    }

    //程序入口,调用run
    public static void main(String[] args) throws Exception {
        // 用于管理当前程序的所有配置
        Configuration conf = new Configuration();
        // 配置输出结果压缩为Gzip格式
//        conf.set("mapreduce.output.fileoutputformat.compress","true");
//        conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.GzipCodec");
        // 调用run方法,提交运行Job
        int status = ToolRunner.run(conf, new MRReadGzip(), args);
        System.exit(status);
    }


    /**
     * 定义Mapper类
     */
    public static class MrMapper extends Mapper<LongWritable, Text, NullWritable, Text>{

        private NullWritable outputKey = NullWritable.get();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //直接输出每条数据
            context.write(this.outputKey,value);
        }
    }

    /**
     * 定义Reduce类
     */
    public static class MrReduce extends Reducer<NullWritable, Text,NullWritable, Text> {

        @Override
        protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //直接输出每条数据
            for (Text value : values) {
                context.write(key, value);
            }
        }
    }

}

(4) 提交运行

yarn jar compress.jar bigdata.itcast.cn.mr.test.MRReadGzip /datas/output/compress/gzip1 /datas/output/compress/gzip2

(5) 查看结果

img

img

Snappy压缩

配置Hadoop支持Snappy

Hadoop支持Snappy类型的压缩算法,并且也是最常用的一种压缩算法,但是Hadoop官方已编译的安装包中并没有提供Snappy的支持,所以如果想使用Snappy压缩,必须下载Hadoop源码,自己进行编译,在编译时添加Snappy的支持,具体编译过程请参考《Hadoop3编译安装》手册。

生成Snappy压缩文件:Map输出不压缩

(1) 需求:读取普通文本文件,转换为Snappy压缩文件

(2) 思路

a) Step1:Input读取普通文本文件

b) Step2:Map和Reduce直接输出

c) Step3:Output配置输出压缩为Snappy类型

(3) 代码开发

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * @ClassName MRWriteSnappy
 * @Description TODO 读取普通文件数据,对数据以Snappy格式进行压缩
 */
public class MRWriteSnappy extends Configured implements Tool {

    /**
    * 构建、配置、提交一个 MapReduce的Job
    */
    public int run(String[] args) throws Exception {

        // 构建Job
        Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
        job.setJarByClass(MRWriteSnappy.class);

        // input:配置输入
        Path inputPath = new Path(args[0]);
        TextInputFormat.setInputPaths(job,inputPath);
        // map:配置Map
        job.setMapperClass(MrMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        // reduce:配置Reduce
        job.setReducerClass(MrReduce.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);


        // output:配置输出
        Path outputPath = new Path(args[1]);
        TextOutputFormat.setOutputPath(job,outputPath);

        return job.waitForCompletion(true) ? 0 : -1;
    }

    // 程序入口,调用run
    public static void main(String[] args) throws Exception {
        // 用于管理当前程序的所有配置
        Configuration conf = new Configuration();
        // 配置输出结果压缩为Snappy格式
        conf.set("mapreduce.output.fileoutputformat.compress","true");
  conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
        // 调用run方法,提交运行Job
        int status = ToolRunner.run(conf, new MRWriteSnappy(), args);
        System.exit(status);
    }


    /**
     * 定义Mapper类
     */
    public static class MrMapper extends Mapper<LongWritable, Text, NullWritable, Text>{

        private NullWritable outputKey = NullWritable.get();
         @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 直接输出每条数据
            context.write(this.outputKey,value);
        }
    }

    /**
     * 定义Reduce类
     */
    public static class MrReduce extends Reducer<NullWritable, Text,NullWritable, Text> {

        @Override
        protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            // 直接输出每条数据
            for (Text value : values) {
                context.write(key, value);
            }
        }
    }

}

(4) 提交运行

yarn jar compress.jar bigdata.itcast.cn.mr.test.MRWriteSnappy /datas/input/compress /datas/output/compress/snappy1

(5) 查看结果

img

img

生成Snappy压缩文件:Map输出压缩

(1) 需求:读取普通文本文件,转换为Snappy压缩文件,并对Map输出的结果使用Snappy压缩

(2) 思路

a) 将上一步的代码中添加Map输出压缩的配置

(3) 代码开发

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * @ClassName MRMapOutputSnappy
 * @Description TODO 读取普通文件数据,对Map输出的数据以Snappy格式进行压缩
 */
public class MRMapOutputSnappy extends Configured implements Tool {

    // 构建、配置、提交一个 MapReduce的Job
    public int run(String[] args) throws Exception {

        // 构建Job
        Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
        job.setJarByClass(MRMapOutputSnappy.class);

        // input:配置输入
        Path inputPath = new Path(args[0]);
        TextInputFormat.setInputPaths(job,inputPath);

        // map:配置Map
        job.setMapperClass(MrMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        // reduce:配置Reduce
        job.setReducerClass(MrReduce.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);


        // output:配置输出
        Path outputPath = new Path(args[1]);
        TextOutputFormat.setOutputPath(job,outputPath);
         return job.waitForCompletion(true) ? 0 : -1;
    }

    // 程序入口,调用run
    public static void main(String[] args) throws Exception {
        // 用于管理当前程序的所有配置
        Configuration conf = new Configuration();
        // 配置Map输出结果压缩为Snappy格式
        conf.set("mapreduce.map.output.compress","true");
        conf.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
        // 配置Reduce输出结果压缩为Snappy格式
        conf.set("mapreduce.output.fileoutputformat.compress","true");
        conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
        // 调用run方法,提交运行Job
        int status = ToolRunner.run(conf, new MRMapOutputSnappy(), args);
        System.exit(status);
    }


    /**
     * 定义Mapper类
     */
    public static class MrMapper extends Mapper<LongWritable, Text, NullWritable, Text>{

        private NullWritable outputKey = NullWritable.get();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 直接输出每条数据
            context.write(this.outputKey,value);
        }
    }

    /**
     * 定义Reduce类
     */
    public static class MrReduce extends Reducer<NullWritable, Text,NullWritable, Text> {

        @Override
        protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            // 直接输出每条数据
            for (Text value : values) {
                context.write(key, value);
            }
        }
    }

}

(4) 提交运行

yarn jar compress.jar bigdata.itcast.cn.mr.test.MRMapOutputSnappy /datas/input/compress /datas/output/compress/snappy2

(5) 查看结果

img

img

读取Snappy压缩文件

(1) 需求:读取上一步生成的Snappy文件,还原为普通文本文件。

(2) 思路

a) Step1:Input读取Snappy文件

b) Step2:Map和Reduce直接输出

c) Step3:Output直接输出为普通文本类型

(4) 代码开发

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
/**
 * @ClassName MRReadSnappy
 * @Description TODO 读取Snappy格式的数据,还原为普通文本文件
 */
public class MRReadSnappy extends Configured implements Tool {

    // 构建、配置、提交一个 MapReduce的Job
    public int run(String[] args) throws Exception {

        // 构建Job
        Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
        job.setJarByClass(MRReadSnappy.class);

        // input:配置输入
        Path inputPath = new Path(args[0]);
        TextInputFormat.setInputPaths(job,inputPath);

        // map:配置Map
        job.setMapperClass(MrMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        // reduce:配置Reduce
        job.setReducerClass(MrReduce.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        // output:配置输出
        Path outputPath = new Path(args[1]);
        TextOutputFormat.setOutputPath(job,outputPath);

        return job.waitForCompletion(true) ? 0 : -1;
    }

    //程序入口,调用run
    public static void main(String[] args) throws Exception {
        // 用于管理当前程序的所有配置
        Configuration conf = new Configuration();
        // 调用run方法,提交运行Job
        int status = ToolRunner.run(conf, new MRReadSnappy(), args);
         System.exit(status);
    }


    /**
     * 定义Mapper类
     */
    public static class MrMapper extends Mapper<LongWritable, Text, NullWritable, Text>{

        private NullWritable outputKey = NullWritable.get();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //直接输出每条数据
            context.write(this.outputKey,value);
        }
    }

    /**
     * 定义Reduce类
     */
    public static class MrReduce extends Reducer<NullWritable, Text,NullWritable, Text> {

        @Override
        protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //直接输出每条数据
            for (Text value : values) {
                context.write(key, value);
            }
        }
    }

}

(4) 提交运行

yarn jar compress.jar bigdata.itcast.cn.mr.test.MRReadSnappy /datas/output/compress/snappy2 /datas/output/compress/snappy3

(6) 查看结果

img

Lzo压缩

配置Hadoop支持Lzo

Hadoop本身不支持Lzo类型的压缩,需要额外单独安装,并在编译时添加Lzo的压缩算法支持,编译过程请参考编译手册《Apache Hadoop3-1-3编译安装部署lzo压缩指南》。

编译完成后,请实现以下配置,让当前的Hadoop支持Lzo压缩

  • 添加lzo支持jar包

cp hadoop-lzo-0.4.21-SNAPSHOT.jar /export/server/hadoop-3.1.4/share/hadoop/common/

img

  • 同步到所有节点
cd  /export/server/hadoop-3.1.4/share/hadoop/common/
scp hadoop-lzo-0.4.21-SNAPSHOT.jar node2:$PWD
scp hadoop-lzo-0.4.21-SNAPSHOT.jar node3:$PWD
  • 修改core-site.xml
<property>
	<name>io.compression.codecs</name>
	<value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec</value>
</property>
<property>
    <name>io.compression.codec.lzo.class</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
  • 同步core-site.xml到其他所有节点
cd  /export/server/hadoop-3.1.4/etc/hadoop
scp  core-site.xml node2:$PWD
scp  core-site.xml node3:$PWD
  • 重新启动Hadoop集群

生成Lzo压缩文件

(1) 需求:读取普通文本文件,生成Lzo压缩结果文件

(2) 思路

a) Step1:读取普通文本文件

b) Step2:Map和Reduce直接输出

c) Step3:配置Output输出压缩为Lzo类型

(3) 代码开发

注意:Lzo在Hadoop中有两种类型的算法编码

  • 第一种:LzoCodec
    • 包:org.apache.hadoop.io.compress.LzoCodec
    • 特点:结尾为.lzo_deflate,不能构建索引,不兼容lzop
    • 应用:Map输出压缩,不用于Reduce输出压缩
  • 第二种:LzopCodec
    • 包:com.hadoop.compression.lzo.LzopCodec
    • 特点:结尾为.lzo,可以构建索引,兼容lzo
    • 应用:一般用于Reduce输出压缩
  • 本案例中使用Lzop
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * @ClassName MRWriteLzo
 * @Description TODO 读取普通文件数据,对数据以Lzo格式进行压缩
 */
public class MRWriteLzo extends Configured implements Tool {

    // 构建、配置、提交一个 MapReduce的Job
    public int run(String[] args) throws Exception {

        // 构建Job
        Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
        job.setJarByClass(MRWriteLzo.class);

        // input:配置输入
        Path inputPath = new Path(args[0]);
        TextInputFormat.setInputPaths(job,inputPath);
        // map:配置Map
        job.setMapperClass(MrMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        // reduce:配置Reduce
        job.setReducerClass(MrReduce.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);


        // output:配置输出
        Path outputPath = new Path(args[1]);
        TextOutputFormat.setOutputPath(job,outputPath);

        return job.waitForCompletion(true) ? 0 : -1;
    }

    //程序入口,调用run
    public static void main(String[] args) throws Exception {
        // 用于管理当前程序的所有配置
        Configuration conf = new Configuration();
        // 配置输出结果压缩为Lzo格式
        conf.set("mapreduce.output.fileoutputformat.compress","true");
        conf.set("mapreduce.output.fileoutputformat.compress.codec","com.hadoop.compression.lzo.LzopCodec");
        // 调用run方法,提交运行Job
        int status = ToolRunner.run(conf, new MRWriteLzo(), args);
        System.exit(status);
    }


    /**
     * 定义Mapper类
     */
    public static class MrMapper extends Mapper<LongWritable, Text, NullWritable, Text>{

        private NullWritable outputKey = NullWritable.get();
         @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 直接输出每条数据
            context.write(this.outputKey,value);
        }
    }

    /**
     * 定义Reduce类
     */
    public static class MrReduce extends Reducer<NullWritable, Text,NullWritable, Text> {

        @Override
        protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            // 直接输出每条数据
            for (Text value : values) {
                context.write(key, value);
            }
        }
    }

}

(4) 提交运行

yarn jar lzo.jar bigdata.itcast.cn.mr.test.MRWriteLzo /datas/input/compress/ /datas/output/compress/lzo1

(5) 查看结果

img

img

读取Lzo压缩文件

(1) 需求:读取Lzo压缩文件,恢复为普通文本文件

(2) 代码开发

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * @ClassName MRReadLzo
 * @Description TODO 读取Lzo格式的数据,还原为普通文本文件
 */
public class MRReadLzo extends Configured implements Tool {

    //构建、配置、提交一个 MapReduce的Job
    public int run(String[] args) throws Exception {

        //构建Job
        Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
        job.setJarByClass(MRReadLzo.class);

        //input:配置输入
        Path inputPath = new Path(args[0]);
        TextInputFormat.setInputPaths(job,inputPath);

        //map:配置Map
        job.setMapperClass(MrMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        //reduce:配置Reduce
        job.setReducerClass(MrReduce.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);


        //output:配置输出
        Path outputPath = new Path(args[1]);
        TextOutputFormat.setOutputPath(job,outputPath);

        return job.waitForCompletion(true) ? 0 : -1;
	}
     //程序入口,调用run
    public static void main(String[] args) throws Exception {
        //用于管理当前程序的所有配置
        Configuration conf = new Configuration();
        //配置输出结果压缩为Gzip格式
//        conf.set("mapreduce.output.fileoutputformat.compress","true");
//        conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.GzipCodec");
        //调用run方法,提交运行Job
        int status = ToolRunner.run(conf, new MRReadLzo(), args);
        System.exit(status);
    }


    /**
     * 定义Mapper类
     */
    public static class MrMapper extends Mapper<LongWritable, Text, NullWritable, Text>{

        private NullWritable outputKey = NullWritable.get();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //直接输出每条数据
            context.write(this.outputKey,value);
        }
    }

    /**
     * 定义Reduce类
     */
    public static class MrReduce extends Reducer<NullWritable, Text,NullWritable, Text> {

        @Override
        protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //直接输出每条数据
            for (Text value : values) {
                context.write(key, value);
            }
        }
    }

}

(3) 提交运行

yarn jar lzo.jar bigdata.itcast.cn.mr.test.MRReadLzo /datas/output/compress/lzo1 /datas/output/compress/lzo2

(4) 查看结果

img

(5) 调整分片大小为30M,重新测试

yarn jar lzo.jar bigdata.itcast.cn.mr.test.MRReadLzo \
-D mapreduce.input.fileinputformat.split.maxsize=31457280 \
/datas/output/compress/lzo1 /datas/output/compress/lzo3

img

问题:分片设置未生效

原因:Lzo类型的文件默认不论多大,都只作为一个分片,只有一个MapTask

生成Lzo索引

(1) 需求:读取Lzo文件,按照分片大小生成MapTask个数

(2) 思路

a) step1:先基于Lzo文件构建Lzo文件索引

b) step2:使用LzoTextInputFormat对Lzo文件根据索引进行划分分片读取

(3) 先构建Lzo索引

yarn jar /export/server/hadoop-3.1.4/share/hadoop/common/hadoop-lzo-0.4.21-SNAPSHOT.jar  com.hadoop.compression.lzo.DistributedLzoIndexer /datas/output/compress/lzo1/part-r-00000.lzo 

img

img

(4) 提交运行

yarn jar lzo.jar bigdata.itcast.cn.mr.test.MRReadLzo \
-D mapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat \
-D mapreduce.input.fileinputformat.split.maxsize=31457280 \
 /datas/output/compress/lzo1 /datas/output/compress/lzo4

(5) 查看结果

img

MapReduce属性优化

属性优化概述

MapReduce的核心优化在于修改数据文件类型、合并小文件、使用压缩等方式,通过降低IO开销来提升MapReduce过程中Task的执行效率。除此之外,MapReduce中也可以通过调节一些参数来从整体上提升MapReduce的性能。

基准测试

功能

可以通过基准测试来测试MapReduce集群对应的性能,观察实施了优化以后的MapReduce的性能是否得到提升等。

MapReduce中自带了基准测试的工具jar包,只要运行即可看到自带的测试工具类

yarn jar /export/server/hadoop-3.1.4/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.4-tests.jar

img

我们选择常用的基准测试工具来实现基准测试

MR Bench

  • 功能:用于指定生成文件,MapTask、ReduceTask的个数,并且可以指定执行的次数

  • 语法

    mrbench
    [-baseDir <base DFS path for output/input, default is /benchmarks/MRBench>]
    [-jar <local path to job jar file containing Mapper and Reducer implementations, default is current jar file>] 
    [-numRuns <number of times to run the job, default is 1>]
    [-maps <number of maps for each run, default is 2>] 
    [-reduces <number of reduces for each run, default is 1>] 
    [-inputLines <number of input lines to generate, default is 1>] 
    [-inputType <type of input to generate, one of ascending (default), descending, random>]
  • 例如:生成每个文件10000行,100个mapper,20个reducer,执行20次

yarn jar /export/server/hadoop-3.1.4/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.4-tests.jar mrbench -numRuns 20 -inputLines 10000 -maps 100 -reduces 20
  • 测试
yarn jar /export/server/hadoop-3.1.4/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.4-tests.jar mrbench -numRuns 1 -inputLines 100 -maps 5 -reduces 1

img

Load Gen

  • 功能:指定对某个数据进行加载,处理,测试性能耗时,可以调整Map和Reduce个数

  • 语法

    Usage: [-m <maps>] [-r <reduces>]
          [-keepmap <percent>] [-keepred <percent>]
           [-indir <path>] [-outdir <path]
           [-inFormat[Indirect] <InputFormat>] [-outFormat <OutputFormat>]
           [-outKey <WritableComparable>] [-outValue <Writable>]
  • 测试:对150M的数据进行测试,10个Map,1个Reduce

yarn jar /export/server/hadoop-3.1.4/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.4-tests.jar loadgen -m 10 -r 1 -indir /datas/input/compress -outdir /datas/output/load

img

Uber模式

功能

我们在实际使用MapReduce过程会发现一种特殊情况,当运行于Hadoop集群上的一些mapreduce作业本身的数据量并不是很大,如果此时的任务分片很多,那么为每个map任务或者reduce任务频繁创建Container,势必会增加Hadoop集群的资源消耗,并且因为创建分配Container本身的开销,还会增加这些任务的运行时延。面临这个问题,如果能将这些小任务都放入少量的Container中执行,将会解决这些问题。MapReduce中就提供了这样的解决方案,Uber模式。

Uber运行模式对小作业进行优化,不会给每个任务分别申请分配Container资源,这些小任务将统一在一个Container中按照先执行map任务后执行reduce任务的顺序串行执行

使用

  • 开启

​ mapreduce.job.ubertask.enable=true(默认为false)

  • 条件
    • map任务的数量不大于mapreduce.job.ubertask.maxmaps参数(默认值是9)
    • reduce任务的数量不大于mapreduce.job.ubertask.maxreduces参数(默认值是1)
    • 输入文件大小不大于mapreduce.job.ubertask.maxbytes参数(默认为1个块的大小128M)
    • map任务和reduce任务需要的资源量不能大于MRAppMaster可用的资源总量

JVM重用

功能

JVM正常指代一个Java进程,Hadoop默认使用派生的JVM来执行map-reducer,如果一个MapReduce程序中有100个Map,10个Reduce,Hadoop默认会为每个Task启动一个JVM来运行,那么就会启动100个JVM来运行MapTask,在JVM启动时内存开销大,尤其是Job大数据量情况,如果单个Task数据量比较小,也会导致JVM资源,这就导致了资源紧张及浪费的情况。

为了解决上述问题,MapReduce中提供了JVM重用机制来解决,JVM重用可以使得JVM实例在同一个job中重新使用N次,当一个Task运行结束以后,JVM不会进行释放,而是继续供下一个Task运行,直到运行了N个Task以后,就会释放,N的值可以在Hadoop的mapred-site.xml文件中进行配置,通常在10-20之间。

使用

  • Hadoop3之前的配置,在mapred-site.xml中添加以下参数

mapreduce.job.jvm.numtasks=10

  • Hadoop3中已不再支持该选项

重试机制

功能

当MapReduce运行过程中,如果出现MapTask或者ReduceTask由于网络、资源等外部因素导致Task失败,AppMaster会检测到Task的任务失败,会立即重新分配资源,重新运行该失败的Task,默认情况下会重试4次,如果重试4次以后依旧没有运行成功,那么整个Job会终止,程序运行失败。我们可以根据实际的情况来调节重试的次数。

使用

  • 可以在mapred-site.xml中修改

  • MapTask失败后的重试次数:默认4次

    mapreduce.map.maxattempts=4

  • ReduceTask失败后的重试次数:默认4次

​ mapreduce.reduce.maxattempts = 4

推测执行

功能

MapReduce模型将Job作业分解成Task任务,然后并行的运行Task任务,使整个Job作业整体执行时间少于各个任务顺序执行时间。这会导致作业执行时间多,但运行缓慢的任务很敏感,因为运行一个缓慢的Task任务会使整个Job作业所用的时间远远长于其他Task任务的时间。

当一个Job有成百上千个Task时,可能会出现拖后腿的Task任务。Task任务执行缓慢可能有多种原因,包括硬件老化或软件配置错误,但检测问题具体原因很难,因为任务总能成功执行完,尽管比预计时间长。Hadoop不会尝试诊断或修复执行慢的任务。

推测执行时指在一个Task任务运行比预期慢时,程序会尽量检测并启动另一个相同的任务作为备份,这就是推测执行(speculative execution),但是如果同时启动两个相同的任务他们会相互竞争,导致推测执行无法正常工作,这对集群资源是一种浪费。

只有在一个Job作业的所有Task任务都启动之后才会启动推测任务,并只针对已经运行一段时间(至少一分钟)且比作业中其他任务平均进度慢的任务。一个任务成功完成后任何正在运行的重复任务都将中止。如果原任务在推测任务之前完成则推测任务就会被中止,同样,如果推测任务先完成则原任务就会被中止。

推测任务只是一种优化措施,它并不能使Job作业运行的更加可靠。如果有一些软件缺陷造成的任务挂起或运行速度慢,依靠推测执行是不能成功解决的。默认情况下推测执行是启用的,可根据集群或每个作业,单独为map任务或reduce任务启用或禁用该功能。

使用

  • 在mapred-site.xml中修改默认属性
  • MapTask的推测执行机制:默认开启

mapreduce.map.speculative = false

  • ReduceTask的推测执行机制:默认开启

mapreduce.reduce.speculative = false

其他属性优化

开启小文件合并优化

  • 针对于小文件,默认每个小文件会构建一个分片,可以使用CombineTextInputFormat代替TextInputFormat,将多个小文件合并为一个分片

  • 实现

    //设置输入类
    job.setInputFormatClass(CombineTextInputFormat.class);
    //设置输入目录
    CombineTextInputFormat.setInputPaths(job,inputPath);
    //设置输入最大分片
    CombineTextInputFormat.setMaxInputSplitSize(job,134217728);
    //设置输入最小分片
    CombineTextInputFormat.setMinInputSplitSize(job,104857600);

减少Shuffle的Spill和Merge

  • 减少spill,默认每个缓冲区大小为100M,每次达到80%开始spill,如果调大这两个值,可以减少数据spill的次数,从而减少磁盘IO

    • 修改mapred-site.xml
      • mapreduce.task.io.sort.mb = 200
      • mapreduce.map.sort.spill.percent = 0.9
  • 减少merge,默认每次生成10个小文件开始进行合并,如果增大文件个数,可以减少merge的次数,从而减少磁盘IO

    • 修改mapred-site.xml
      • mapreduce.task.io.sort.factor = 15

开启Reduce端缓存

  • 默认情况下Reduce端会将数据从Buffer缓存写入磁盘,然后再从磁盘读取数据进行处理,Buffer中不会存储数据,如果内存允许的情况下,我们可以直接将Buffer的数据缓存在内存中,读取时直接从内存中获取数据。
  • 修改mapred-site.xml,指定20%内存空间用来存储Buffer的数据,默认为0

mapreduce.reduce.input.buffer.percent = 0.2

本章Maven依赖

<properties>
    <hadoop.version>3.1.4</hadoop.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
         <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <!-- ORC依赖 -->
    <dependency>
        <groupId>org.apache.orc</groupId>
        <artifactId>orc-mapreduce</artifactId>
        <version>1.6.3</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13</version>
    </dependency>
</dependencies>