avatar

目录
Hadoop学习笔记(三)MapReduce

Hadoop学习笔记(三)MapReduce

一、简介

一个存储在分布式文件系统中的大规模数据集,会被切分成许多独立的小数据块,这些小数据块可以被多个Map任务并行处理。Map任务生成的结果会作为Reduce任务的输入,并最终由Reduce输出最终结果。(MapReduce的应用程序不一定要用JAVA来写)

二、MR编程(JAVA)

2.1 编写程序流程简介

在JAVA编程中,MR程序编写主要流程分为:

  1. 编写Mapper类
  2. 编写Reduce类
  3. 编写Job启动类
  4. 打包运行

2.2 MR程序入门示例(WordCount)

java
package cn.wangbowen.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
/**
* WordCount class
* 单词计数MapReduce程序
*
* @author BoWenWang
* @date 2020/2/17 0:37
*/
public class WordCount {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取运行参数(其中至少一个输入路径以及一个输出路径)
Configuration conf = new Configuration();
String[] remainingArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if (remainingArgs.length < 2) {
System.err.println("Usage: WordCount <in> [<in>...] <out>");
System.exit(2);
}
// 创建Job任务
Job job = Job.getInstance(conf, "WordCount");
// 设置Jar包运行的类
job.setJarByClass(WordCount.class);
// 设置Map和Reduce类
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReduce.class);
// 设置reduce输出的键值对类型(这里其实map和reduce同时设置了)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 设置map输出的键值对类型(如果map和reduce输出键值对类型一样,下面两个就不用设置)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 指定原始数据存放在路径
for (int i = 0; i < remainingArgs.length - 1; i++) {
FileInputFormat.addInputPath(job, new Path(remainingArgs[i]));
}
// 指定处理输出数据存放路径
FileOutputFormat.setOutputPath(job, new Path(remainingArgs[remainingArgs.length - 1]));
// 将job提交给集群运行
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

/**
* Map类
* 4个泛型中,前两个是指定mapper输入数据的类型。输入输出都是以键值对形式封装的。
* 其中LongWritable是对Long的封装,Text是对String的封装。
* 默认情况下,框架传递给我的mapper的输入数据中,key是要处理文本中一行的起始偏移量,这一行的内容作为value
*/
public static class WcMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private static LongWritable NUM = new LongWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将这一行的内容转为String类型字符串
String line = value.toString();
// 拆分单词
String[] words = line.split(" ");
for (String word : words) {
// 将每个单词输出为以键值对形式
context.write(new Text(word), NUM);
}
}
}

/**
* Reduce类
* 框架在Map处理之后,将所有键值对分组,传递一个组,调用一次reduce方法
*/
public static class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable> {

@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0L;
// 遍历累加求和
for (LongWritable value : values) {
count += value.get();
}
// 输出这一个单词的统计结果
context.write(key, new LongWritable(count));
}
}
}

2.3 运行MR程序

本地模式(IDEA)

  1. 创建文本文件作为输入数据文件

    avatar

  2. 设置运行参数,根据自己实际的目录(这里采用IDEA编译器)

    avatar

    avatar

  3. 运行main函数

    avatar

集群模式

  1. 修改pom文件,添加打包方式

    xml
    <packaging>jar</packaging>
  2. 打包程序

    avatar

  3. 将打包文件上传到集群上,或者设置一个共享文件夹让本地和集群联通

    avatar

  4. 启动HDFS,并在HDFS创建输入文件

    Code
    // 启动HDFS
    $start-dfs.sh
    $start-yarn.sh
    // 创建输入文件
    $hdfs dfs -mkdir -p /user/wbw/mr/word_count
    $hdfs dfs -put ~/sfs/words.txt /user/wbw/mr/word_count/
    // 运行命令
    // 解析:$hadoop jar <jar文件> <全路径主类名> <HDFS输入数据源目录可以多个> <HDFS输出结果目录>
    $hadoop jar Hadoop-1.0-SNAPSHOT.jar cn.wangbowen.mr.WordCount hdfs://s201/user/wbw/mr/word_count/ hdfs://s201/user/wbw/mr/word_count/out
  5. 打开WEB-UI(主机名:8088)最终会显示成功

    avatar

  6. 查看结果

    Code
    [wbw@s201 /home/wbw/sfs]$hdfs dfs -cat /user/wbw/mr/word_count/out/part-r-00000
    apple 1
    hello 3
    tom 2
    world 2
  7. 可能会遇到的问题

    Code
    Please check whether your etc/hadoop/mapred-site.xml contains the below configuration:
    <property>
    <name>yarn.app.mapreduce.am.env</name>
    <value>HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}</value>
    </property>
    <property>
    <name>mapreduce.map.env</name>
    <value>HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}</value>
    </property>
    <property>
    <name>mapreduce.reduce.env</name>
    <value>HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}</value>
    </property>

    解决方法:按错误提示,在mapred-site.xml配置文件中添加hadoop根目录

2.4 Chain链式MR编程

简介

有时候一个MR程序,并不能解决我们的问题,因而需要多个MR程序配合使用,前一个MR的输出结果作为后一个MR程序的输入。又或者我们写完一个MR以后,过一段时间,需要增加一些条件,那我们还要重新对代码进行编码。这样工作量无疑大大增加了,因此,现在有一个ChainMapper和ChainReducer来帮我们处理这些问题,使MR程序可重用。

一个MR程序,是由MapTask和ReduceTask两个阶段组成。在MapTask阶段可以有>=1个Mapper类,在ReduceTask阶段,可以由一个Reduce类+[>=0个Mapper类]组成。因此,一个MR程序可以有如下顺序执行结构:[Mapper] –> [Mapper] –> [Mapper] –> [Reduce] –> [Mapper] –> [Mapper](但是,最基本的必须由一个Mapper+一个Reduce组成)这样就组成了一条链子样子的链式处理。同理,第一个Mapper的输出,可以作为第二Mapper的输入,第三个Mapper的输出可以作为Reduce的输入,Reduce的输出可以作为后面Mapper的输入,以此类推。

使用

  1. 需求:统计单词数量(要求长度大于等于5,且总数大于3)

  2. 数据准备:

    Code
    apple apple apple apple apple 
    pink pink pink pink pink
    hello hello hello

    可以看到apple符合要求,pink总数符合但是长度不符合,hello长度符合但是总数不合符,因此最终输出结果应该是《apple 5》

  3. 编写Mapper类

    • WCChain(主类)

      !!!特别要注意ChainMapper和ChainReducer调用,别复制太快了!!!

      java
      /**
      * WCChain class
      * MR主类
      *
      * @author BoWenWang
      * @date 2020/2/21 23:43
      */
      public class WCChain {
      public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
      // 获取运行参数(其中至少一个输入路径以及一个输出路径)
      Configuration conf = new Configuration();
      String[] remainingArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
      if (remainingArgs.length < 2) {
      System.err.println("Usage: WordCountChain <in> [<in>...] <out>");
      System.exit(2);
      }
      // 创建Job任务
      Job job = Job.getInstance(conf, "WordCountChain");
      // 设置Jar包运行的类
      job.setJarByClass(WCChain.class);
      /**
      * 添加MapperTasks(注意是ChainMapper调用!)
      * addMapper参数介绍:
      * #1 :Job任务
      * #2 :Mapper类
      * #3、4 :输入数据K、V类型类
      * #5、6 :输出数据K、V类型类
      * #7 :配置文件(可以用上面的 conf)
      */
      ChainMapper.addMapper(job, Mapper1.class, LongWritable.class, Text.class, Text.class, IntWritable.class, conf);
      ChainMapper.addMapper(job, Mapper2.class, Text.class, IntWritable.class, Text.class, IntWritable.class, conf);
      /**
      * 添加ReduceTasks(注意都是ChainReducer调用!)
      * setReducer参数介绍:
      * #1 :Job任务
      * #2 :Reduce类
      * #3、4 :输入数据K、V类型类
      * #5、6 :输出数据K、V类型类
      * #7 :配置文件(可以用上面的 conf)
      */
      ChainReducer.setReducer(job, Reduce1.class, Text.class, IntWritable.class, Text.class, IntWritable.class, conf);
      ChainReducer.addMapper(job, Mapper3.class, Text.class, IntWritable.class, Text.class, IntWritable.class, conf);
      // 指定原始数据存放在路径
      for (int i = 0; i < remainingArgs.length - 1; i++) {
      FileInputFormat.addInputPath(job, new Path(remainingArgs[i]));
      }
      // 指定处理输出数据存放路径
      FileOutputFormat.setOutputPath(job, new Path(remainingArgs[remainingArgs.length - 1]));
      // 将job提交给集群运行
      System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
      }
    • Mapper、Reduce类

      java
      /**
      * Mapper1 class
      * 第一个Mapper处理文本文件输入数据,输出所有K-V:单词-1
      *
      * @author BoWenWang
      * @date 2020/2/21 23:31
      */
      public class Mapper1 extends Mapper<LongWritable, Text, Text, IntWritable> {
      private static IntWritable SUM = new IntWritable(1);
      @Override
      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      String line = value.toString();
      String[] words = line.split(" ");
      for (String word : words) {
      context.write(new Text(word), SUM);
      }
      }
      }
      java
      /**
      * Mapper2 class
      * 第二个Mapper过滤长度<5的单词
      * 输入为Mapper1的输出,即K-V:单词-1
      * 输出为K-V:单词-1
      *
      * @author BoWenWang
      * @date 2020/2/21 23:31
      */
      public class Mapper2 extends Mapper<Text, IntWritable, Text, IntWritable> {
      @Override
      protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
      if (key.toString().length() >= 5) {
      context.write(key, value);
      }
      }
      }
      java
      /**
      * Reducer1 class
      * 这是MR唯一一个Reduce类,用来统计各单词总数
      * 输入为Mapper2的输入K-V: 单词-1
      * 输出为K-V: 单词-总数
      *
      * @author BoWenWang
      * @date 2020/2/21 23:38
      */
      public class Reduce1 extends Reducer<Text, IntWritable, Text, IntWritable> {
      @Override
      protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      int count = 0;
      for (IntWritable value : values) {
      count += value.get();
      }
      context.write(key, new IntWritable(count));
      }
      }
      java
      /**
      * Mapper3 class
      * 第三个Mapper过滤总数小于等于3的单词
      * 输入为Reduce的输出,即K-V:单词-总数
      * 输出为K-V:符合要求的单词-总数
      *
      * @author BoWenWang
      * @date 2020/2/21 23:31
      */
      public class Mapper3 extends Mapper<Text, IntWritable, Text, IntWritable> {
      @Override
      protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
      if (value.get() > 3) {
      context.write(key, value);
      }
      }
      }

三、MR工作流程

3.1 整体流程简介

  1. MapReduce框架使用InputFormat模块做Map前的预处理,然后将输入文件分为逻辑上的多个InputSplit(只是一个逻辑概念,并没有实际切分。只是记录了要处理数据的位置和长度)

  2. 通过RecordReader根据InputSplit中的信息来处理具体记录,加载数据并转化为合适Map任务读取的键值对给Map任务

  3. Map任务根据用户自定义的映射规则,输出一系列<key,value>作为中间结果

  4. 经过Shuffle(对Map的输出进行一定的分区、排序、合并、归并等操作),得到<key,value-list>形式的中间结果

  5. Reduce以<key,value-list>中间结果作为输入,执行用户自定义的逻辑,输出结果给OutputFormat模块

  6. OutputFormat模块会验证输出目录是否已经存在以及输出结果类型是否符合配置文件中的配置类型。都满足就会输出结果到分布式文件系统

3.2 Job提交流程详解

参考另一篇文章Yarn

3.3 job提交流程源码解析

本地模式

这里我们对2.2中的代码进行断点追踪来分析一下本地运行流程:

  1. 创建外部Job(mapreduce.Job),设置配置信息

    即为2.2中main方法里面的内容。

  1. 通过jobsubmitter作业提交器将job.xml + split等文件写入临时目录

    avatar

    我们可以看到这里创建了一个临时目录,用来放提交任务临时文件。

    avatar

    接着将编写的job信息拷贝和配置文件方法。

    avatar

    创建提交任务文件。

    avatar

    计算切片数量(map数量),下一行是写入到conf对象。

    avatar

    将配置信息写入到提交作业文件。

    avatar

    打开临时文件目录,打开job.xml发现,程序里面给job.set都是给job.xml配置作业属性。同时目录下还有切片信息。

  1. 通过jobSubmitter提交job给localJobRunner

    avatar

  1. LocalJobRunner将外部Job 转换成成内部Job

    avatar

    这个job是内部类,在LocalJobRunner。如果是集群就是YARNRunner。所以在提交作业的时候,先把job转换成内部类。

  1. 内部Job线程,开放分线程执行job

    avatar

    进入内部类job的构造函数,发现最底下有一个start函数。因此这个内部类是个线程类,找到它的run方法,打断点进入发现确实开启一个分线程。

    avatar

    进入run函数中,发现这个变量就是用来存放MAP后的键值对。

    avatar

    接着发现,这里可以看到mapRunnables为map的线程数,以及reduce任务数。

    avatar

    avatar

    继续查看发现它又进行了,获取切片信息,获得map任务集合,以及运行map任务,以及之后的reduce任务。

    avatar

    从结构上可以看到,Job内部类其实又有两个线程类。

    avatar

    进入Map的runTask,可以看到这里又有一个submit,以及后面的debug值,发现这又开了一个线程,这其实就是上面结构图中MapTaskRunnable线程类。

  1. job执行线程分别计算Map和reduce任务信息,并通过线程池孵化新线程执行MR任务。

    avatar

    avatar

    可以看到这里创建了Map任务对象和,以及船舰输出文件,map毕竟也是要将结果输出给reduce的。最后又执行了run方法。

    avatar

    进入run后,发现有一个runNewMapper。

    avatar

    进入runNewMapper。发现了上下文对象名(可以联想到自己写mapper的时候那个参数context),反射工具库类,以及这个mapper对象值是自己写的Map类名信息。

    avatar

    最后这个mapper执行了run方法(通过线程池孵化新线程执行MR任务)。

    avatar

    进入run方法。这里有3个过程,其中我们只编写了第二部分的内容,即自己的mapper类。

    avatar

    继续打断点,发现程序进入到了我们自己写的方法里面了,可以看出我们写的代码逻辑到现在才进行运行。

  1. 接下来reduce的过程也是类似的

    avatar

    这里有一些排序等操作。

    avatar

    可以看到在执行reduce前有一个shuffle的操作。

    avatar

    avatar

    avatar

    至此程序就结束了,然后输出结果。

四、Shuffle机制(map后reduce前)

4.1流程简介

avatar

MapTask阶段

  1. 输入数据和执行Map任务

    Map任务接受<key,value>作为输入,按一定规则以<key,value>输出。但不会直接输出到磁盘上,而是先写入一个环形缓冲区中。

  2. 溢写(分区、排序、可选combiner)

    每个Map任务都会分配一个环形缓冲区(默认100M,可以在io.sort.mb属性修改),随着任务的进行,缓存不断增大,当达到溢写比例0.8(io.sort.spill.percent属性)后,先将80M写入磁盘(即,一个后台线程把内容溢出(spill)硬盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件),20M空间继续提供Map结果写入。【如果缓冲区满了,那么会被阻塞,直到写入磁盘过程完成】

    但是,在溢写到磁盘前,缓存中的数据会先被分区(partition)。默认采用Hash函数对key哈希后再用Reduce任务数量进行取模。这样就可以把Map结果均匀分配给R个Reduce任务并行处理。

    对于每个分区内的所有键值对,会根据key在内存中排序。然后对于排序的输出结果进行可选的合并(combiner,如:两个<”test”,1>就会合并成<”test”,2>。一般用于累加、最大值等场景。可以减少写到磁盘的数据和传递给reduce的数据)

    经过以上步骤,就会新建一个溢出文件(spill file)写入磁盘了。

  3. 文件归并 + 可选combiner

    每次溢写都会产生一个新的溢写文件,最终在Map任务全部结束之前,会对所有溢写文件中的数据进行归并成一个大的溢写文件(归并:两个<”test”,1>就会合并成<”test”,{1,1}>)

    如果至少存在3个溢出文件(可以设置)那么会再次调用combiner函数。(combiner函数可以在输入上反复运行而不影响最终结果,但如果map输出规模很小,那么不值得使用combiner带来的开销)

  4. 可选压缩

    压缩map输出到磁盘的过程中对它进行压缩往往是个很好的注意,默认不进行,可以设置。

ReduceTask阶段

  1. 数据复制阶段

    由于map任务完成的时间可能不同,因此在每个任务完成时,reduce任务就开始复制其输出。

    reduce怎么知道去哪里取数据?map任务完成后会利用心跳机制通知application master,因此其知道map输出和主机之间的映射关系。reduce中的一个线程定期询问master来获取位置信息,直到获得所有的输出位置。由于reduce可能会失败,所以map输出不会立即删除,而是等application master来告之

    如果map输出相当小,则直接复制到缓冲区中。如果map输出非常大,则复制到磁盘。当内存缓冲区达到阈值大小后,则合并后写到磁盘中。如果有combiner,则在合并期间运行它。以降低写入磁盘的数据量。

  2. 归并数据

    随着磁盘上的副本增多,后台线程会将他们合并为更大的、排好序的N个文件。(注意,为了合并,压缩的map输出,要在内存中被解压。)

  3. 在数据进入reduce方法之前,还可以对数据进行分组,即相同key会被分到一组

  4. 把排好序的数据输入给Reduce函数,并产生最终结果,写入HDFS。

4.2 Partitioner分区

简介

我们知道,Map处理完数据,会往分区里面放,分区中不同区传给不同的reduce。而分区的个数是由reduce任务数决定的。Hadoop中自带了一个默认的分区类HashPartitioner,它继承了Partitioner类,提供了一个getPartition的方法:

java
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public HashPartitioner() {
}
// 按Key进行hash的分区函数
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & 2147483647) % numReduceTasks;
}
}

为什么要自定义分区函数?是为了解决数据倾斜(比如有3个reduce,第一个分区有98%的数据,第二个有2%,第三个0%,这样就造成了资源的不合理分配,浪费资源),合理的分配是有助于reduce计算的。

自定义

  1. 定义分区类

    java
    /**
    * YearPartitioner class
    * 分区类
    * 根据年份分区
    *
    * @author BoWenWang
    * @date 2020/2/21 16:38
    */
    public class YearPartitioner extends Partitioner<ComboKey, NullWritable> {
    @Override
    public int getPartition(ComboKey comboKey, NullWritable nullWritable, int i) {
    int year = comboKey.getYear();
    return year % i;
    }
    }
  2. 程序中配置使用分区类

    java
       //设置分区
    job.setPartitionerClass(YearPartitioner.class);
    // 设置reduce任务数量(要和Partitioner分数数量保持一致,多了会产生空文件,少了会报错)
    job.setNumReduceTasks(3);

4.3 Combiner 合成(可选组件)

简介

map的输出作为combiner的输入,combiner的输出作为reduce的输入。

不管调用多少次combiner,最终reduce的输出结果都是一样的。

combiner函数,可以理解为是一个小型reduce。combiner是一个优化方案。举一个例子来说:

Code
# 统计某年的最高气温
假设现在有个Map任务
第一个Map输出:
(1950, 0)
(1950, 20)
(1950, 10)
第二个Map输出:
(1950, 25)
(1950, 15)

1.按照传统方式的话,当运行到reduce的时候,输入如下:
(1950, [0,20,10,25,15])
2.使用combiner的话,会在Map段先进行一次处理,即找出每个Map的最大值,然后再传给reduce:
(1950, [20,25])

这样的好处是减少了Map和Reduce任务之间数据的传输量,提高速度

但是,combiner不能代替reduce。其使用场景是有限的,如统计最大值等。如果是求平均值的场景,那么每个Map的输出结果会先进行一次平均值计算,最后reduce再次计算两个结果的平均值,这会使结果出错。

其在Java中的使用,和reduce创建方式一样,唯一区别就是在Job类中设置combiner类 job.setCombinerClass(XXXX.class)

java
public void setCombinerClass(Class<? extends Reducer> cls) throws IllegalStateException {
this.ensureState(Job.JobState.DEFINE);
this.conf.setClass("mapreduce.job.combine.class", cls, Reducer.class);
}

自定义

  1. 定义一个类继承Reducer。

  2. 在主函数中调用方法

    java
    job.setCombinerClass(WcReduce.class);

4.4 排序对比器

简介

reduce前将Map传来的key-value对进行排序,可以自定义排序对比器。

自定义

  1. 定义对比器类(注意要有一个构造函数来注册)

    java
    /**
    * ComboKeyComparator class
    * 利用这个排序对比器来进行对自定义类的排序
    *
    * @author BoWenWang
    * @date 2020/2/21 18:08
    */
    public class ComboKeyComparator extends WritableComparator {
    // 注册
    protected ComboKeyComparator() {
    super(ComboKey.class, true);
    }

    // 因此ComboKey类实现了自己的比较方法,直接调用
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
    ComboKey o1 = (ComboKey)a;
    ComboKey o2 = (ComboKey)b;
    return o1.compareTo(o2);
    }
    }
  2. main函数中添加设置

    java
    // 设置排序对比起
    job.setSortComparatorClass(ComboKeyComparator.class);

4.5 分组

简介

对于在同一个reduce中的key-value对,相同的key被分到一组,即<key,< value1,value2,…,valueN>>形式。这里可以自定义分组。

自定义

  1. 定义分组类

    java
    /**
    * YearGroupingComparator class
    * 分组对比器:用于将一个reduce中不同key进行分组,相同的key将
    * 其值都放到一个iterator中
    *
    * @author BoWenWang
    * @date 2020/2/21 18:04
    */
    public class YearGroupingComparator extends WritableComparator {
    // 注册
    protected YearGroupingComparator() {
    super(ComboKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {

    ComboKey o1 = (ComboKey)a;
    ComboKey o2 = (ComboKey)b;
    return o1.getYear() - o2.getYear();
    }
    }
  2. 在main函数中配置

    java
    // 设置分组对比器
    job.setGroupingComparatorClass(YearGroupingComparator.class);

五、计数器

有时候我们需要在map和reduce的时候打印一些统计信息,但是将程序放到集群上运行时候,如果用sout来输出的话,是在不同机器上的logs文件下。因此我们需要一个计数器来进行统计。

  1. 添加计数器代码

    java
    context.getCounter("m", "WCMapper.map").increment(1);
  2. 程序运行结果输出报告:

    avatar

六、序列化(自定义类)

有时候原本的数据结构不够用了(如下面的二次排序),这时就需要我们定义自己需要的结果来进行编程操作。注意事项如下:

  1. 实现WritableComparable接口(相当于分别实现了Writable和Comparable接口)。
  2. 重写write、readFields序列化、但序列化方法,要注意顺序一致。
  3. 提供一个无参构造函数,用于反射。
  4. 重写compareTo方法,用于当把类当成key时的比较。
  5. 重写toString方法,当Reduce输出类型为自定义类时,可以以指定样式输出到文本文件。

代码示例:

java
/**
* ComboKey class
* 自定义组合累类(用于充当key)
* 相当于自定义序列化:
* 实现WritableComparable接口,相当于分别实现了Writable和Comparable接口
*
* @author BoWenWang
* @date 2020/2/21 16:26
*/
public class ComboKey implements WritableComparable<ComboKey> {
// 年份
private int year;
// 温度
private int temp;

// 要提供一个无参构造函数,用于反射
public ComboKey() {
}

public ComboKey(int year, int temp) {
this.year = year;
this.temp = temp;
}

public int getYear() {
return year;
}

public void setYear(int year) {
this.year = year;
}

public int getTemp() {
return temp;
}

public void setTemp(int temp) {
this.temp = temp;
}

/**
* 比较函数用于排序:
* 如果年份相同,温度降序;如果温度相同,年份升序
*/
@Override
public int compareTo(ComboKey o) {
if (o.getYear() == this.year) {
return o.getTemp() - this.temp;
} else {
return this.year - o.getYear();
}
}
/**
* 序列化
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(this.year);
dataOutput.writeInt(this.temp);
}
/**
* 反序列化
* 注意:
* 序列化和反序列化的顺序要一致
* 要提供一个无参构造函数,用于反射
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
this.year = dataInput.readInt();
this.temp = dataInput.readInt();
}

}

七、排序

7.1 全排序

全排序的实现方式:

  1. 定义一个reduce (适合数据量少,存在数据倾斜问题)

  2. 自定义分区类(手动按照key的值划分分区,存在数据倾斜问题)

  3. 使用Hadoop提供的TotalOrderPartitioner + (对inputFormat)RandomSampler(采样器)

    随机采样。抽取样本数,计算出区间值,将区间值写入分区文件,分区文件就是一个序列文件,这个序列文件只有key没有value(null)

    代码示例:

    java
    public class MaxTempApp {
    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS","file:///");
    Job job = Job.getInstance(conf);
    //设置job的各种属性
    job.setJobName("MaxTempApp"); //作业名称
    job.setJarByClass(MaxTempApp.class); //搜索类
    job.setInputFormatClass(SequenceFileInputFormat.class); //设置输入格式
    //添加输入路径
    FileInputFormat.addInputPath(job,new Path(args[0]));
    //设置输出路径
    FileOutputFormat.setOutputPath(job,new Path(args[1]));
    job.setMapperClass(MaxTempMapper.class); //mapper类
    job.setReducerClass(MaxTempReducer.class); //reducer类
    job.setMapOutputKeyClass(IntWritable.class); //
    job.setMapOutputValueClass(IntWritable.class); //
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(IntWritable.class); //
    // -------------------------------------------------------------------
    // 创建随机采样器对象
    // freq:每个key被选中的概率
    // numSapmple:抽取样本的总数
    // maxSplitSampled:最大采样切片数
    InputSampler.Sampler<IntWritable, IntWritable> sampler =
    new InputSampler.RandomSampler<IntWritable, IntWritable>(1, 6000, 3);
    // 将sample数据写入分区文件.
    TotalOrderPartitioner.setPartitionFile(conf,new Path("file:///d:/mr/par.lst"));
    // 设置全排序分区类
    job.setPartitionerClass(TotalOrderPartitioner.class);
    job.setNumReduceTasks(3); //reduce个数
    InputSampler.writePartitionFile(job, sampler);

    //job.waitForCompletion(true);
    }
    }

    注意事项:

    1. TotalOrderPartitioner //全排序分区类,读取外部生成的分区文件确定区间。
    2. 使用时采样代码在最后端,否则会出现错误。
    3. 分区文件设置,设置的job的配置对象,不要是之前的conf.
      TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path(“d:/mr/par.lst”));
    4. 输入格式最好要序列文件,如果采用文本输入格式,采样的是偏移量那将没有意义

7.2 倒排序

K-V对调

7.3 二次排序

编程步骤:

  1. 数据准备

    Code
    2000 12
    2007 11
    2002 20
    2000 15
    2001 10
    2002 10
    2003 20
    2004 11
  2. 需求:需要对value排序

  3. 自定义key(代码序列化章节)

  4. 自定义分区类,按照年份分区(代码shuffle章节)

  5. 定义Key排序对比器(代码shuffle章节)

  6. 编写Mapper、Reduce、Main类

    java
    /**
    * YearTopTemp class
    * 每年最高温度
    *
    * @author BoWenWang
    * @date 2020/2/21 16:47
    */
    public class YearTopTemp {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    // 获取运行参数(其中至少一个输入路径以及一个输出路径)
    Configuration conf = new Configuration();
    String[] remainingArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
    if (remainingArgs.length < 2) {
    System.err.println("Usage: YearTopTemp <in> [<in>...] <out>");
    System.exit(2);
    }
    // 创建Job任务
    Job job = Job.getInstance(conf, "YearTopTemp");
    // 设置Jar包运行的类
    job.setJarByClass(YearTopTemp.class);
    // 设置Map和Reduce类
    job.setMapperClass(MyMapper.class);
    job.setReducerClass(MyReduce.class);
    // 设置reduce输出的键值对类型(这里其实map和reduce同时设置了)
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(IntWritable.class);
    // 设置map输出的键值对类型(如果map和reduce输出键值对类型一样,下面两个就不用设置)
    job.setMapOutputKeyClass(ComboKey.class);
    job.setMapOutputValueClass(NullWritable.class);
    // 指定原始数据存放在路径
    for (int i = 0; i < remainingArgs.length - 1; i++) {
    FileInputFormat.addInputPath(job, new Path(remainingArgs[i]));
    }
    // 指定处理输出数据存放路径
    FileOutputFormat.setOutputPath(job, new Path(remainingArgs[remainingArgs.length - 1]));
    //设置分区
    job.setPartitionerClass(YearPartitioner.class);
    job.setNumReduceTasks(3);
    // 设置排序对比器
    job.setSortComparatorClass(ComboKeyComparator.class);
    // 设置分组对比器
    job.setGroupingComparatorClass(YearGroupingComparator.class);
    // 将job提交给集群运行
    System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    /**
    * Map类
    */
    public static class MyMapper extends Mapper<LongWritable, Text, ComboKey, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    String[] words = line.split(" ");

    context.write(new ComboKey(Integer.parseInt(words[0]),Integer.parseInt(words[1])), NullWritable.get());
    }
    }

    /**
    * Reduce类
    */
    public static class MyReduce extends Reducer<ComboKey, NullWritable, IntWritable, IntWritable> {
    @Override
    protected void reduce(ComboKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    context.write(new IntWritable(key.getYear()), new IntWritable(key.getTemp()));
    }
    }
    }
文章作者: IT小王
文章链接: https://wangbowen.cn/2020/02/16/Hadoop%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E4%B8%89%EF%BC%89MapReduce/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 IT小王

评论