avatar

目录
Hadoop学习笔记(四)数据的输入与输出

Hadoop学习笔记(四)数据的输入与输出

先来看一张流程图:

avatar

我们可以看到在MR前后还有一些关于文件输入与输出的内容。首先是InputFormat用来接受输入文件,并对其进行切片。然后通过RecordReader对这些切片进行读取数据,产生KV对再传给MAP来处理。

一、压缩和解压缩

在分析split切片的时候,有看到一个“可压缩”关键字。其实,在进行切片的时候,会先判断这个文件是否是可压缩的,那么这个压缩有什么用?

java
// 判断是否可以切割代码片段
if (this.isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);

long bytesRemaining;
int blkIndex;
for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
}

if (bytesRemaining != 0L) {
blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
}
} else {
if (LOG.isDebugEnabled() && length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath());
}

1.1 进行压缩的原因

  1. 通过压缩来减少文件传输量的大小,把网络带宽的压力转移给了CPU(因为需要解压、解压缩)。

  2. 很多元数据,而且又不经常用,那么就可以进行压缩后再存储,节省磁盘空间。

1.2 安装snappy压缩库(centos7)【未成功】

如果安装不了snappy可以试试下面的LZO。(反正我是没有成功)

步骤参考:https://blog.csdn.net/qq_27078095/article/details/56865443【在hadoop2.X集群中安装压缩工具snappy(主要用于hbase)】

  1. 安装支持环境

    Code
    sudo yum -y install gcc gcc-c++ libtool cmake zlib-devel maven
    (如果maven很慢参考手动安装)

    安装MAVEN

    Code
    https://blog.51cto.com/13581826/2093965
  2. 下载基础安装包

    Code
    hadoop-2.6.0-cdh5.9.0-src.tar.gz(下载地址:http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.9.0-src.tar.gz,也可下载二进制包,内包含src源码:hadoop-2.6.0-cdh5.9.0-tar.gz)

    snappy1.1.1.tar.gz(下载地址:http://pkgs.fedoraproject.org/repo/pkgs/snappy/snappy-1.1.1.tar.gz/8887e3b7253b22a31f5486bca3cbc1c2/snappy-1.1.1.tar.gz)

    protobuf-2.5.0.tar.gz(下载地址:https://github.com/google/protobuf/releases/tag/v2.5.0 建议选择2.5.0版本,不支持最新版本)
  3. 下载地址
    https://launchpad.net/ubuntu/+source/snappy/1.1.4-1
    
    Code

    4. 安装snappy
    $tar -zxvf snappy-1.1.1.tar.gz $cd snappy-1.1.1 $./configure $make $sudo make install $ll /usr/local/lib/ | grep snappy #查看snappy是否安装完成
    Code

    5. 安装protobuf(可能有点久)
    $tar -zxvf protobuf-2.5.0.tar.gz $cd protobuf-2.5.0 $./configure $make $sudo make install $protoc --version #验证安装 libprotoc 2.5.0
    Code

    6. 编译生成hadoop-native-Libraries(包括snappy)【据说很慢!原作者花了20小时?】
    $tar -zxvf hadoop-2.6.0-cdh5.9.0-src.tar.gz $cd hadoop-2.6.0-cdh5.9.0 $mvn package -DskipTests -Pdist,native -Dtar -Dsnappy.lib=/usr/local/lib -Dbundle.snappy
    Code

    编译成功后,snappy的so文件会生成在如下目录:
    hadoop-2.6.0-cdh5.9.0/hadoop-dist/target/hadoop-2.6.0-cdh5.9.0/lib/native
    Code

    将此目录下的文件拷贝到hadoop集群中的hadoop下的lib/native目录没有则新建,各节点均需拷贝。

    修改配置文件:
    $cat core-site.xml <property> <name>io.compression.codecs</name> <value>org.apache.hadoop.io.compress.SnappyCodec</value> </property>
    Code

    测试是否安装成功:
    $ hadoop checknative -a hadoop: true ...../hadoop-2.6.0-cdh5.9.0/lib/native/libhadoop.so zlib: true /usr/local/lib/libz.so.1 snappy: true ...../hadoop-2.6.0-cdh5.9.0/lib/native/libsnappy.so.1 lz4: true revision:10301 bzip2: false openssl: true /lib64/libcrypto.so
    Code



    ### 1.3 安装LZO库文件

    如果上面的snappy安装不了,可以试试LZO。但是Hadoop中没有LZO的类,所以我们要手动安装。

    1. 修改项目pom文件

    ```xml
    <dependency>
    <groupId>org.anarres.lzo</groupId>
    <artifactId>lzo-hadoop</artifactId>
    <version>1.0.0</version>
    <scope>compile</scope>
    </dependency>
  4. 在centos上安装lzo库

    Code
    $sudo yum -y install lzo
  5. 使用mvn命令下载工件中的所有依赖

    Code
    进入pom.xml所在目录,运行cmd:
    mvn -DoutputDirectory=./lib -DgroupId=XXX -DartifactId=模块 -Dversion=版本 dependency:copy-dependencies

    比如我自己的:
    mvn -DoutputDirectory=./lib -DgroupId=cn.wangbowen -DartifactId=Hadoop -Dversion=1.0-SNAPSHOT dependency:copy-dependencies
  6. 在项目lib下会生成项目依赖所有的第三方jar

  7. 找出lzo-hadoop.jar + lzo-core.jar复制到hadoop的响应目录下

    Code
    $cp lzo-core-1.0.0.jar /soft/hadoop/share/hadoop/common/lib/
    $cp lzo-hadoop-1.0.0.jar /soft/hadoop/share/hadoop/common/lib/

1.4 代码示例

java
package cn.wangbowen.hdfs.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.FileInputStream;
import java.io.FileOutputStream;

/**
* CompressUtils class
* 压缩解压缩工具类
*
* @author BoWenWang
* @date 2020/2/19 16:40
*/
public class CompressUtils {

// 需要压缩的文件路径[D:\tmp\a.txt]
private static final String ZIP_IN_FILE_PATH = "/home/wbw/tmp/a.txt";
// 压缩文件输出路径(不加后缀名)[D:\tmp\b]
private static final String ZIP_OUT_FILE_PATH = "/home/wbw/tmp/b";
// 需要解压的文件路径[D:\tmp\b]
private static final String UNZIP_IN_FILE_PATH = "/home/wbw/tmp/b";
// 输出解压文件路径(不加后缀名)[D:\tmp\c]
private static final String UNZIP_OUT_FILE_PATH = "/home/wbw/tmp/c";

public static void main(String[] args) throws Exception {
Class[] zipClasses = {
DeflateCodec.class,
GzipCodec.class,
BZip2Codec.class,
Lz4Codec.class,
//SnappyCodec.class
com.hadoop.compression.lzo.LzoCodec.class
};
System.out.println("=========== ZIP ===========");
for(Class c : zipClasses){
zip(c);
}
System.out.println("========== UNZIP ==========");
for(Class c : zipClasses){
unzip(c);
}

}

/**
* 压缩
*/
public static void zip(Class codecClass) throws Exception {
long start = System.currentTimeMillis();
// 实例化对象
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());
// 创建文件输出流,得到默认扩展名
FileOutputStream fos = new FileOutputStream(ZIP_OUT_FILE_PATH + codec.getDefaultExtension());
// 得到压缩流Output
CompressionOutputStream zipOut = codec.createOutputStream(fos);
IOUtils.copyBytes(new FileInputStream(ZIP_IN_FILE_PATH), zipOut, 1024);
zipOut.close();
System.out.println("Zip[" + codecClass.getSimpleName() + "]: " + (System.currentTimeMillis() - start) + "ms");
}

/**
* 解压
*/
public static void unzip(Class codecClass) throws Exception {
long start = System.currentTimeMillis();
// 实例化对象
CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, new Configuration());
// 创建文件输入流,得到默认扩展名
FileInputStream fis = new FileInputStream(UNZIP_IN_FILE_PATH + codec.getDefaultExtension());
// 得到压缩流Input
CompressionInputStream zipIn = codec.createInputStream(fis);
IOUtils.copyBytes(zipIn,new FileOutputStream(UNZIP_OUT_FILE_PATH + codec.getDefaultExtension() + ".txt"), 1024);
zipIn.close();
System.out.println("UnZip[" + codecClass.getSimpleName() + "]: " + (System.currentTimeMillis() - start) + "ms");
}
}

运行结果

Code
=========== ZIP ===========
2020-02-19 21:40:02,200 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
2020-02-19 21:40:02,278 INFO compress.CodecPool: Got brand-new compressor [.deflate]
Zip[DeflateCodec]: 541ms
2020-02-19 21:40:02,446 INFO compress.CodecPool: Got brand-new compressor [.gz]
Zip[GzipCodec]: 173ms
2020-02-19 21:40:02,626 WARN bzip2.Bzip2Factory: Failed to load/initialize native-bzip2 library system-native, will use pure-Java version
2020-02-19 21:40:02,627 INFO compress.CodecPool: Got brand-new compressor [.bz2]
Zip[BZip2Codec]: 9194ms
2020-02-19 21:40:11,828 INFO compress.CodecPool: Got brand-new compressor [.lz4]
Zip[Lz4Codec]: 136ms
Zip[LzoCodec]: 232ms
========== UNZIP ==========
2020-02-19 21:40:12,119 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
UnZip[DeflateCodec]: 122ms
2020-02-19 21:40:12,241 INFO compress.CodecPool: Got brand-new decompressor [.gz]
UnZip[GzipCodec]: 80ms
2020-02-19 21:40:12,356 INFO compress.CodecPool: Got brand-new decompressor [.bz2]
UnZip[BZip2Codec]: 1200ms
2020-02-19 21:40:13,531 INFO compress.CodecPool: Got brand-new decompressor [.lz4]
UnZip[Lz4Codec]: 77ms
UnZip[LzoCodec]: 150ms

二、SequenceFile序列文件

2.1 SquenceFile简介

  • sequenceFile文件是Hadoop用来存储“二进制”形式的[Key,Value]对而设计的一种平面文件(Flat File)。
  • 可以把SequenceFile当做是一个容器,把所有的文件打包到SequenceFile类中可以高效的对小文件进行存储和处理。
  • SequenceFile文件并不按照其存储的Key进行排序存储,SequenceFile的内部类Writer提供了append功能。
  • SequenceFile中的Key和Value可以是任意类型Writable或者是自定义Writable。

2.2 同步标识

同步标识,用于快速定位到记录的边界。同时因为有同步点,因此可切割(splitable)。

每条Record(记录)以键值对的方式进行存储,用来表示它的字符数组可以一次解析成:记录的长度、Key的长度、Key值和value值,并且Value值的结构取决于该记录是否被压缩。

java
reader.sync(pos);	//定位到pos之后的第一个同步点
writer.sync(); //写入同步点

2.3 文件格式

​ 顺序文件由文件头和随后的一条或多条记录组成,顺序文件的前三个字节为SEQ(顺序文件代码),紧随其后的一个字节表示顺序文件的版本号。文件头还包括其他字段,例如键和值类的名称、数据压缩细节、用户定义的元数据以及同步标识。如前所述,同步标识用于在读取文件时能够从任意位置开始识别记录边界。每个文件都有一个随机生成的同步标识,其值存储在文件头中。同步标识位于顺序文件中的记录与记录之间。同步标识的额外存储开销要求小于1%,所以没有必要在每条记录末尾添加该标识(特别是比较短的记录)

​ 注意生成的文件不可查看,会乱码,但是可以通过:hdfs dfs -text filepath 来查看内容。

2.4 压缩方式

  • 不压缩:
  • record压缩:只压缩value
  • 块压缩:按照多个record形成一个block

推荐:https://blog.csdn.net/qq_33813365/article/details/82864241

2.5 代码示例

单元测试前置代码:

java
public class SeqFileTest {

private FileSystem fs;
private Configuration conf;
private Path filePath;

@Before
public void init() throws IOException {
conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
fs = FileSystem.get(conf);
filePath = new Path("d:/tmp/seq/1.seq");
}
}

写操作

java
/**
* 测试写入操作
*/
@Test
public void testWrite() throws IOException {
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, filePath, IntWritable.class, Text.class);
for (int i = 0; i < 5; i++) {
// 写入操作
writer.append(new IntWritable(i), new Text("tom" + i));
//每条记录后添加一个同步点
writer.sync();
}
for (int i = 0; i < 5; i++) {
writer.append(new IntWritable(i), new Text("tom" + i));
// 隔一个添加一个同步点
if (i % 2 == 0) {
writer.sync();
}
}
writer.close();
}

写操作(压缩)

java
/**
* 测试写入操作(压缩)
*/
@Test
public void testWriteGzip() throws IOException {
// 多了2个参数(一个是方式,一个是压缩类)
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, filePath, IntWritable.class, Text.class,
SequenceFile.CompressionType.RECORD, new GzipCodec());
for (int i = 0; i < 5; i++) {
writer.append(new IntWritable(i), new Text("tom" + i));
}

writer.close();
}

读操作1(key, value)

java
/**
* 测试读操作
* 方式一:通过 reader.next(key, value)
*/
@Test
public void testRead() throws IOException {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
IntWritable key = new IntWritable();
Text value = new Text();

while (reader.next(key, value)) {
System.out.println(key.get() + ":" + value.toString());
}

reader.close();
}

读操作2(key)

java
/**
* 测试读操作
* 方式二:通过 reader.next(key)
*/
@Test
public void testRead2() throws IOException {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
IntWritable key = new IntWritable();
Text value = new Text();
while (reader.next(key)) {
reader.getCurrentValue(value);
System.out.println(key.get() + ":" + value.toString());
}

reader.close();
}

读操作3(seek)

java
/**
* 测试读操作
* 方式三:通过 reader.seek(pos)
* 先通过方式一,调用reader.getPosition()获取每条记录的偏移量。
* (本次测试偏移量为:153,198,243,288,333,378,423,448,493,518)
*/
@Test
public void testRead3() throws IOException {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
IntWritable key = new IntWritable();
Text value = new Text();
// 243为记录起始位置:运行结果成功!
reader.seek(243);
reader.next(key, value);
System.out.println(value.toString());
// 245为记录中间位置:运行结果失败,抛出异常!
reader.seek(245);
reader.next(key, value);

reader.close();
}

由上面代码可以看出,seek()定位操作,只能刚好定位到一条记录的起始位置,不然调用next()时候会抛异常。

同时,可以看到前面中间偏移量间隔是45字节,后面有部分是25字节。应该是检查点占了20字节。

读操作4(sync)

java
/**
* 测试读操作
* 方式四:通过 reader.sync(pos)
* 先通过方式一,调用reader.getPosition()获取每条记录的偏移量。
* (本次测试偏移量为:153,198,243,288,333,378,423,448,493,518)
*/
@Test
public void testRead4() throws IOException {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
IntWritable key = new IntWritable();
Text value = new Text();
// 155为偏移量中间位置,结果不会报错
reader.sync(155);
reader.next(key, value);
System.out.println(value.toString());

reader.close();
}

三、MapFile

3.1 简介

一个MapFile可以通过SequenceFile的地址,进行分类查找的格式。

使用这个格式的优点在于:

  • 与SequenceFile只生成一个文件不同,MapFile生成一个目录,目录下有index和data文件,都是序列文件。

    avatar
    可以看到index文件里面左边的是记录的索引,右边的代表数据在data文件的位置。

  • 首先会将SequenceFile中的地址都加载入内存,并且进行了key升序写入(可重复)。

  • index文件划分key区间,用于快速定位,从而提供更快的数据查找。

  • 索引模型按128个键建立的(可以看到上面图中左边一列以128递增),可以通过io.map.index.interval来修改

缺点:

  • 文件不支持复写操作,不能向已存在的SequenceFile(MapFile)追加存储记录
  • 当write流不关闭的时候,没有办法构造read流。也就是在执行文件写操作的时候,该文件是不可读取的

3.2 使用

  • MapFile和SequenceFile使用一样只要把类名替换,把输出路径由具体文件名改为目录就行。
  • 查看文件内容也是要命令(hdfs dfs -text filepath)

四、MR程序文件输入输出格式

4.1 设置单文件格式输入输出类型

介绍了多种文件输入类型后,我们知道输入和输出可以有不同的格式。其中我们拿文件格式SequenceFile来举例:

  1. 首先我们修改之前的单词计数程序,增加指定格式文件输入输出

    java
    // 设置文件输入/输出格式
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
  2. 然后运行程序,将输出文件放到HDFS上查看

    avatar

    可以发现,输出改为SequenceFile后,用普通的-cat无法查看,而-text可以

4.2 MultipleInputs多文件格式数据输入源

如果输入数据文件有多种混合类型,可以用MultipleInputs.addInputPath()进行操作。

java
//多个输入
MultipleInputs.addInputPath(job,new Path("file:///d:/mr/txt"),TextInputFormat.class, WCTextMapper.class);
MultipleInputs.addInputPath(job,new Path("file:///d:/mr/seq"), SequenceFileInputFormat.class,WCSeqMapper.class);

五、数据输入文件的处理

5.1 InputFormat

  1. 获取切片集合

  2. 子类都要重写方法isSplittable()

  3. 负责创建RecordReader对象

    avatar

    可以看到对于不同类型的InputFormat其都有自己的Reader。

  4. 设置IO路径

5.2 Split切片

分片不包含数据本身,而是指向数据的引用。

  1. 我们根据上次的JOB提交分析,可以留意到在job提交过程中有一个方法:

    java
    int maps = this.writeSplits(job, submitJobDir);
  2. 单步调试进入该方法

    java
    private int writeSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {
    // 得到配置信息
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    // 获取一个新的map
    if (jConf.getUseNewMapper()) {
    maps = this.writeNewSplits(job, jobSubmitDir);
    } else {
    maps = this.writeOldSplits(jConf, jobSubmitDir);
    }

    return maps;
    }
  3. 单步进入this.writeNewSplits方法

    java
    private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input = (InputFormat)ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
    // 这里有一个获取切片的方法,返回一个切片集合
    // InputSplit(输入切片):代表了要被单个map处理的数据,每一个map()操作只处理一个输入切片。
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (InputSplit[])((InputSplit[])splits.toArray(new InputSplit[splits.size()]));
    Arrays.sort(array, new JobSubmitter.SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
    return array.length;
    }
  4. 单步进入input.getSplits方法

    java
    public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = (new StopWatch()).start();
    // this.getFormatMinSplitSize()获取格式化切片最小大小,返回的是一个固定的值:1L
    /*
    public static long getMinSplitSize(JobContext job) {
    return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);
    }
    可以看到如果没有配置“mapreduce.input.fileinputformat.split.minsize”属性,默认值1L。
    去集群上查看一下(输入命令后发现,返回的是0):
    [wbw@s201 /home/wbw]$hdfs getconf -confKey mapreduce.input.fileinputformat.split.minsize
    0
    */
    // 所以minSize一定 >=1
    long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
    /*
    public static long getMaxSplitSize(JobContext context) {
    return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L);
    }
    去集群上查看一下(发现没有,那么取默认值9223372036854775807L):
    [wbw@s201 /home/wbw]$hdfs getconf -confKey mapreduce.input.fileinputformat.split.maxsize
    Configuration mapreduce.input.fileinputformat.split.maxsize is missing.
    */
    // 所以maxSize一定 <= Long.Max
    long maxSize = getMaxSplitSize(job);
    List<InputSplit> splits = new ArrayList();
    List<FileStatus> files = this.listStatus(job);
    // ...
    while(true) {
    while(true) {
    while(true) {
    FileStatus file;
    do {
    if (!var10.hasNext()) {
    // ...
    return splits;
    }
    file = (FileStatus)var10.next();
    } while(ignoreDirs && file.isDirectory());

    Path path = file.getPath();
    long length = file.getLen();
    if (length != 0L) {
    // ...
    // isSplitable:如果是压缩文件,那么不可以切
    if (this.isSplitable(job, path)) {
    // 获取block块大小
    long blockSize = file.getBlockSize();
    /*
    protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
    }
    */
    // 计算切片大小:即在块大小、最大(小)切片中取中间值
    long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
    // ...
    splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
    }
    } else {
    splits.add(this.makeSplit(path, 0L, length, new String[0]));
    }
    }
    }
    }
    }
  5. 我们修改一下words.txt内容(查看属性,发现有49个字节。其中45个字母,有4个是回车换行):

    Code
    hello world tom			15 + 2(\r\n)
    tom hello world 15 + 2
    world tom hello 15

    再修改WordCount类,手动添加指定切片大小(一个块默认128M取中间值后,所以切片大小应该是13):

    java
    // 设置最大最小切片
    FileInputFormat.setMaxInputSplitSize(job, 13L);
    FileInputFormat.setMinInputSplitSize(job, 1L);

    接着重新debug程序:

    avatar

    可以看到,文件被切成了4片,每一片大小13。但是如果切13个字节的话,一行文本就被断掉了,数据就有问题了。

    avatar

    avatar

    avatar

    可以看到实际上,并没有,依然是一行一条。这就涉及到RecordReader阅读器了。

5.3 RecordReader读取法则

InputSplit描述了数据块的切分方式,RecordReader类则是实际用来加载split分片数据,并把数据转换为适合Mapper类里面map()方法处理的<key, value>形式。

接着 3.3 我们打断点,一步一步进入最后发现到了一个LineRecordReader类中的this.in.readLine方法:

java
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
this.setup(context);
try {
/*断点*/ while(context.nextKeyValue()) {
this.map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
// 打断点后发现,如果nextKeyValue返回的是false就会执行cleanup
this.cleanup(context);
}
}
java
/*
截取自:https://blog.51cto.com/luchunli/1718322

LineRecordReader类由一个FileSplit构造出来,start是这个FileSplit的起始位置,pos是当前读取分片的位置,end是分片结束位置,in是打开的一个读取这个分片的输入流,它是使用这个FileSplit对应的文件名来打开的。
*/
public class LineRecordReader extends RecordReader<LongWritable, Text> {
public void initialize(InputSplit genericSplit,TaskAttemptContext context)
throws IOException {
// 1. 接收split(FileSplit对象)分片,并通过分片解析出:
// 分片起始位置:start = split.getStart();
// 结束位置:end = start + split.getLength();
// 文件位置:在HDFS上的绝对路径final Path file = split.getPath();
// 2. 获取文件的输入流
// 通过FileSystem获取文件,并获取输入流 fileIn = fs.open(file);
// 3. 判定是否为压缩文件,并获取压缩格式
// CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
// 4. 计算行偏移量(原始解释如下)
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}

/*
* 最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对。
*/
public boolean nextKeyValue() throws IOException {
// key-->这里为map task中map()函数的key
if (this.key == null) {
this.key = new LongWritable();
}
// 取的是行偏移量
this.key.set(this.pos);
if (this.value == null) {
this.value = new Text();
}

int newSize = 0;
// 功能:多读取一些数据,补充完整的一行
while(this.getFilePosition() <= this.end || this.in.needAdditionalRecordAfterSplit()) {
if (this.pos == 0L) {
newSize = this.skipUtfByteOrderMark();
} else {
// 判定split是否已经读取解析完成,如果未完成的话就读取一行数据
// 通过org.apache.hadoop.util.LineReader的readCustomLine或readDefaultLine读取
// 如果指定了行分隔符则调用readCustomLine;
// 否则默认通过回车换行作为分隔符调用readDefaultLine
newSize = this.in.readLine(this.value, this.maxLineLength, this.maxBytesToConsume(this.pos));
// 偏移量加,上个读取的行的长度,作为下一行的偏移量
this.pos += (long)newSize;
}

if (newSize == 0 || newSize < this.maxLineLength) {
break;
}

LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - (long)newSize));
}

if (newSize == 0) {
this.key = null;
this.value = null;
return false;
} else {
return true;
}
}
}

结论:

Code
这里有个很重要的类Recordreader,inputformat到map中间有一个reader,你切片可以这样定义(就是代码定义的13),但是reader要去读数据,他会首先判断读取的位置是不是行首,若是,则会一直读到回车换行;若不是行首,它会从下一行开始读。这就是为什么我们第二个切片内容会是一个完整的行内容,而不是从第一行的 om 开始读。

所以我们虽然有四个切片,但是我们只有前三个split有数据,最后一个split是空的,因为我们只有三行数据,依照reader的读取数据法则,到最后一个split的时候我们已近没有数据可读了。

这里我们可能会问,split和reader这到底听谁的?或者他们俩的功能感觉差不多?

假设split的大小是128M,我们读了n行数据,在第n行读了一半,如果不读剩下的,会丢数据,这里可能会说,下一个切片把数据读走再分析,但是在并发情况下,切片很大可能在不同节点上运行的,怎么把这两个数据对接在一起再分析。所以这时候就需要reader了,就算我们split值满了,我们还是要把这行读完。(reader是一行一行的把数据发给mapper的)

切片是定义大方向的,而这个reader是处理细节,让你不丢失数据,或者数据不错乱。
Code
那么一行数据,可能在不同的splits中,也可能在不同的block中。

在不同的block中呢,这个有fileIn对象帮我们处理的了,主要是读取read到缓冲区,属于物理上的问题,不是考虑的地方。

处于不同的split呢?这个情况有些问题,因为不同的split就是不同的划分,并且由不同的map task执行。

那么我们recordreader如何解决这个问题呢?

解决办法便是,突破split的start和end限制。

linerecordreader的解决办法:

只要start指向的位置不是文件的第一行,则默认的过滤掉一行(start位置可能是一行中的某一个位置,比如本例子切片定义的13,即第一行t/om之间)。

在nextKeyvalue方法中,多读取一些数据,补充完整的一行。

OK,通过过滤掉一行,和多读取一行,就能保证被split分隔的一行,能够完成的读取,同时也不会重复处理一些数据。因为,所有的mapTask的linerecordreader都遵循这个方法。

六、数据库读取与写入

数据的输入不仅仅可以从常用的TextInputFormat进行,还可以直接从数据库中读取和写入。

6.1 数据库准备

sql
create database big4 ;
use big4 ;
create table words(id int primary key auto_increment , name varchar(20) , txt varchar(255));

insert into words(name,txt) values('tomas','hello world tom');
insert into words(txt) values('hello tom world');
insert into words(txt) values('world hello tom');
insert into words(txt) values('world tom hello');

create table stats(word varchar(50),wordCount int);

6.2 自定义数据对象类

首先我们回顾一下jdbc程序的读取/写入操作,关键是setXxx()方法和getXxx()方法。

  • jdbc写操作

    java
    Class.forName("com.mysql.jdbc.Driver");
    Connection conn = DriverMananger.getConnection("jdbc:mysql://localhost:3306/big4","root","root");
    PreparedStatement ppst = conn.preparedStatement("insert into test(id,name,age) values(?,?,?)");
    //绑定参数
    ppst.setInteger(1,1);
    ppst.setInteger(2,"tom");
    ppst.setInteger(3,12);

    ppst.executeUpdate();
    ppst.close();
    conn.close();
  • jdbc读操作

    java
    Class.forName("com.mysql.jdbc.Driver");
    Connection conn = DriverMananger.getConnection("jdbc:mysql://localhost:3306/big4","root","root");

    ppst = conn.preparedStatement("select id,name from test ");
    ResultSet rs = ppst.executeQuery();
    while(rs.next()){
    int id = rs.getInt("id");
    String name = rs.getInt("name");
    }
    rs.close();
    conn.close();

然后,我们开始编写自定义数据对象类:

java
/**
* MyDBWritable class
* 自定义数据对象类
* 实现DBWritable接口:由JDBC可以知道将数据读取/写入到数据库的时候,需要对预处理
* 进行setXxx(),对返回的结果集进行getXxx()来吧数据写入到SQL语句,或者将返
* 回结果写入对象。
* 实现Writable接口:即上一章中的Hadoop中的序列化,来适配MR程序的输入输出。
*
* @author BoWenWang
* @date 2020/2/24 18:09
*/
public class MyDBWritable implements DBWritable, Writable {

// 读表所需参数
private int id;
private String name;
private String txt;

// 写表所需参数
private String word;
private int wordCount;

public MyDBWritable() {
}

// 省略get/set方法...

// MR
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(id);
dataOutput.writeUTF(name);
dataOutput.writeUTF(txt);
dataOutput.writeUTF(word);
dataOutput.writeInt(wordCount);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
id = dataInput.readInt();
name = dataInput.readUTF();
txt = dataInput.readUTF();
word = dataInput.readUTF();
wordCount = dataInput.readInt();
}

// DB
/**
* 写入数据库的只有2个参数
*/
@Override
public void write(PreparedStatement preparedStatement) throws SQLException {
// 这里注意顺序!
preparedStatement.setString(1, word);
preparedStatement.setInt(2, wordCount);
}

/**
* 从数据库中读取数据,仅需3个参数
*/
@Override
public void readFields(ResultSet resultSet) throws SQLException {
id = resultSet.getInt(1);
name = resultSet.getString(2);
txt = resultSet.getString(3);
}
}

6.3 编写Mapper和Reducer

Mapper

java
/**
* WCMapper class
* 这里的输入KV类型是从数据库读取的,以LongWritable为key,自定义类为value
* 注意这里的value,已经是将数据库中取到的记录封装到对象中了
*
* @author BoWenWang
* @date 2020/2/24 18:27
*/
public class WCMapper extends Mapper<LongWritable, MyDBWritable, Text, IntWritable> {
private static final IntWritable SUM = new IntWritable(1);

@Override
protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException {
// 数据库字段txt为文本内容
String line = value.getTxt();
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word), SUM);
}
}
}

Reduce

java
/**
* WCReader class
* 这里注意一点,通过对源码的跟踪发现:这里输出的value是泛型,且没有地方用到
* 也就是说这个值是没有用的,因此用NullWritable,而Key就是我们的自定类,最后
* 会将我们的自定义类,根据重写的write写入数据库。
*
* @author BoWenWang
* @date 2020/2/24 21:15
*/
public class WCReducer extends Reducer<Text, IntWritable, MyDBWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count++;
}
MyDBWritable rs = new MyDBWritable();
rs.setWord(key.toString());
rs.setWordCount(count);
context.write(rs, NullWritable.get());
}
}

6.4 编写App启动类

java
/**
* DBWCApp class
*
* @author BoWenWang
* @date 2020/2/24 21:23
*/
public class DBWCApp {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置JOB属性
job.setJobName("DBWCApp");
job.setJarByClass(DBWCApp.class);
// 配置数据库连接信息
String driverClass = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://localhost:3306/big4";
String username = "root";
String password = "root";
// 将数据库配置写入job的配置属性中
DBConfiguration.configureDB(job.getConfiguration(), driverClass, url, username, password);
// 设置数据输入,参数介绍:
// 1、2:作业、和自定义类(因为里面重写了数据库读出和写入的函数)
// 3:数据库查询语句
// 4:表记录数(根据这个数量来计算切片数量)
DBInputFormat.setInput(job, MyDBWritable.class,
"select id,name,txt from words", "select count(*) from words");
// 设置数据输出,参数介绍:
// 1:作业
// 2:数据输出表名
// 3:字段名(要和自定义类重写的write方法字段顺序一致)
DBOutputFormat.setOutput(job, "stats", "word", "wordCount");
// 设置Map、Reduce
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
// 设置输出KV类型
job.setOutputKeyClass(MyDBWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 提交作业
job.waitForCompletion(true);
}
}

6.5 添加pom依赖

xml
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>

6.6 本地运行

avatar

数据写入成功。

6.7 集群运行

  1. 因为我们程序用到了mysql驱动类,而Hadoop中没有!所以我们要利用之前的方法,把mysql-connector-java-X.X.X.jar包放到Hadoop的lib目录下。【可以参照1.3内容】

  2. 分发jar包到所有节点的lib目录下,因为不知道程序实际运行在哪个节点上。

  3. 修改程序中的mysql的url地址,改成本地IP。

    Code
    进入cmd命令界面,输入ipconfig,找到IPV4地址,如我的:192.168.174.1
  4. 清空数据库输出表内容。

  5. 利用MAVEN打jar包上传。

  6. 启动集群,运行jar包。

    Code
    hadoop jar Hadoop-1.0-SNAPSHOT.jar cn.wangbowen.mr.db.DBWCApp
  7. 可能会报错不被允许连接数据库,这时候要对本地数据库进行一些设置:

    可以参考 https://blog.csdn.net/yang5726685/article/details/52529082

  8. 查看WEB-UI 8088端口,发现有一个作业正常运行,等一会查看数据库。发现数据和本地的效果一致。

    avatar

文章作者: IT小王
文章链接: https://wangbowen.cn/2020/02/24/Hadoop%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E5%9B%9B%EF%BC%89%E6%95%B0%E6%8D%AE%E7%9A%84%E8%BE%93%E5%85%A5%E4%B8%8E%E8%BE%93%E5%87%BA/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 IT小王

评论