Hadoop学习笔记(四)数据的输入与输出
先来看一张流程图:
我们可以看到在MR前后还有一些关于文件输入与输出的内容。首先是InputFormat用来接受输入文件,并对其进行切片。然后通过RecordReader对这些切片进行读取数据,产生KV对再传给MAP来处理。
一、压缩和解压缩
在分析split切片的时候,有看到一个“可压缩”关键字。其实,在进行切片的时候,会先判断这个文件是否是可压缩的,那么这个压缩有什么用?
// 判断是否可以切割代码片段 |
1.1 进行压缩的原因
通过压缩来减少文件传输量的大小,把网络带宽的压力转移给了CPU(因为需要解压、解压缩)。
很多元数据,而且又不经常用,那么就可以进行压缩后再存储,节省磁盘空间。
1.2 安装snappy压缩库(centos7)【未成功】
如果安装不了snappy可以试试下面的LZO。(反正我是没有成功)
步骤参考:https://blog.csdn.net/qq_27078095/article/details/56865443【在hadoop2.X集群中安装压缩工具snappy(主要用于hbase)】
安装支持环境
Codesudo yum -y install gcc gcc-c++ libtool cmake zlib-devel maven
(如果maven很慢参考手动安装)安装MAVEN
Codehttps://blog.51cto.com/13581826/2093965
下载基础安装包
Codehadoop-2.6.0-cdh5.9.0-src.tar.gz(下载地址:http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.9.0-src.tar.gz,也可下载二进制包,内包含src源码:hadoop-2.6.0-cdh5.9.0-tar.gz)
snappy1.1.1.tar.gz(下载地址:http://pkgs.fedoraproject.org/repo/pkgs/snappy/snappy-1.1.1.tar.gz/8887e3b7253b22a31f5486bca3cbc1c2/snappy-1.1.1.tar.gz)
protobuf-2.5.0.tar.gz(下载地址:https://github.com/google/protobuf/releases/tag/v2.5.0 建议选择2.5.0版本,不支持最新版本)下载地址 https://launchpad.net/ubuntu/+source/snappy/1.1.4-1
$tar -zxvf snappy-1.1.1.tar.gz $cd snappy-1.1.1 $./configure $make $sudo make install $ll /usr/local/lib/ | grep snappy #查看snappy是否安装完成Code
4. 安装snappy$tar -zxvf protobuf-2.5.0.tar.gz $cd protobuf-2.5.0 $./configure $make $sudo make install $protoc --version #验证安装 libprotoc 2.5.0Code
5. 安装protobuf(可能有点久)$tar -zxvf hadoop-2.6.0-cdh5.9.0-src.tar.gz $cd hadoop-2.6.0-cdh5.9.0 $mvn package -DskipTests -Pdist,native -Dtar -Dsnappy.lib=/usr/local/lib -Dbundle.snappyCode
6. 编译生成hadoop-native-Libraries(包括snappy)【据说很慢!原作者花了20小时?】hadoop-2.6.0-cdh5.9.0/hadoop-dist/target/hadoop-2.6.0-cdh5.9.0/lib/nativeCode
编译成功后,snappy的so文件会生成在如下目录:$cat core-site.xml <property> <name>io.compression.codecs</name> <value>org.apache.hadoop.io.compress.SnappyCodec</value> </property>Code
将此目录下的文件拷贝到hadoop集群中的hadoop下的lib/native目录没有则新建,各节点均需拷贝。
修改配置文件:$ hadoop checknative -a hadoop: true ...../hadoop-2.6.0-cdh5.9.0/lib/native/libhadoop.so zlib: true /usr/local/lib/libz.so.1 snappy: true ...../hadoop-2.6.0-cdh5.9.0/lib/native/libsnappy.so.1 lz4: true revision:10301 bzip2: false openssl: true /lib64/libcrypto.soCode
测试是否安装成功:Code
### 1.3 安装LZO库文件
如果上面的snappy安装不了,可以试试LZO。但是Hadoop中没有LZO的类,所以我们要手动安装。
1. 修改项目pom文件
```xml
<dependency>
<groupId>org.anarres.lzo</groupId>
<artifactId>lzo-hadoop</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
</dependency>在centos上安装lzo库
Code$sudo yum -y install lzo
使用mvn命令下载工件中的所有依赖
Code进入pom.xml所在目录,运行cmd:
mvn -DoutputDirectory=./lib -DgroupId=XXX -DartifactId=模块 -Dversion=版本 dependency:copy-dependencies
比如我自己的:
mvn -DoutputDirectory=./lib -DgroupId=cn.wangbowen -DartifactId=Hadoop -Dversion=1.0-SNAPSHOT dependency:copy-dependencies在项目lib下会生成项目依赖所有的第三方jar
找出lzo-hadoop.jar + lzo-core.jar复制到hadoop的响应目录下
Code$cp lzo-core-1.0.0.jar /soft/hadoop/share/hadoop/common/lib/
$cp lzo-hadoop-1.0.0.jar /soft/hadoop/share/hadoop/common/lib/
1.4 代码示例
package cn.wangbowen.hdfs.utils; |
运行结果
=========== ZIP =========== |
二、SequenceFile序列文件
2.1 SquenceFile简介
- sequenceFile文件是Hadoop用来存储“二进制”形式的[Key,Value]对而设计的一种平面文件(Flat File)。
- 可以把SequenceFile当做是一个容器,把所有的文件打包到SequenceFile类中可以高效的对小文件进行存储和处理。
- SequenceFile文件并不按照其存储的Key进行排序存储,SequenceFile的内部类Writer提供了append功能。
- SequenceFile中的Key和Value可以是任意类型Writable或者是自定义Writable。
2.2 同步标识
同步标识,用于快速定位到记录的边界。同时因为有同步点,因此可切割(splitable)。
每条Record(记录)以键值对的方式进行存储,用来表示它的字符数组可以一次解析成:记录的长度、Key的长度、Key值和value值,并且Value值的结构取决于该记录是否被压缩。
reader.sync(pos); //定位到pos之后的第一个同步点 |
2.3 文件格式
顺序文件由文件头和随后的一条或多条记录组成,顺序文件的前三个字节为SEQ(顺序文件代码),紧随其后的一个字节表示顺序文件的版本号。文件头还包括其他字段,例如键和值类的名称、数据压缩细节、用户定义的元数据以及同步标识。如前所述,同步标识用于在读取文件时能够从任意位置开始识别记录边界。每个文件都有一个随机生成的同步标识,其值存储在文件头中。同步标识位于顺序文件中的记录与记录之间。同步标识的额外存储开销要求小于1%,所以没有必要在每条记录末尾添加该标识(特别是比较短的记录)
注意生成的文件不可查看,会乱码,但是可以通过:hdfs dfs -text filepath 来查看内容。
2.4 压缩方式
- 不压缩:
- record压缩:只压缩value
- 块压缩:按照多个record形成一个block
推荐:https://blog.csdn.net/qq_33813365/article/details/82864241
2.5 代码示例
单元测试前置代码:
public class SeqFileTest { |
写操作
/** |
写操作(压缩)
/** |
读操作1(key, value)
/** |
读操作2(key)
/** |
读操作3(seek)
/** |
由上面代码可以看出,seek()定位操作,只能刚好定位到一条记录的起始位置,不然调用next()时候会抛异常。
同时,可以看到前面中间偏移量间隔是45字节,后面有部分是25字节。应该是检查点占了20字节。
读操作4(sync)
/** |
三、MapFile
3.1 简介
一个MapFile可以通过SequenceFile的地址,进行分类查找的格式。
使用这个格式的优点在于:
与SequenceFile只生成一个文件不同,MapFile生成一个目录,目录下有index和data文件,都是序列文件。
首先会将SequenceFile中的地址都加载入内存,并且进行了key升序写入(可重复)。
index文件划分key区间,用于快速定位,从而提供更快的数据查找。
索引模型按128个键建立的(可以看到上面图中左边一列以128递增),可以通过io.map.index.interval来修改
缺点:
- 文件不支持复写操作,不能向已存在的SequenceFile(MapFile)追加存储记录
- 当write流不关闭的时候,没有办法构造read流。也就是在执行文件写操作的时候,该文件是不可读取的
3.2 使用
- MapFile和SequenceFile使用一样只要把类名替换,把输出路径由具体文件名改为目录就行。
- 查看文件内容也是要命令(hdfs dfs -text filepath)
四、MR程序文件输入输出格式
4.1 设置单文件格式输入输出类型
介绍了多种文件输入类型后,我们知道输入和输出可以有不同的格式。其中我们拿文件格式SequenceFile来举例:
首先我们修改之前的单词计数程序,增加指定格式文件输入输出
java// 设置文件输入/输出格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);然后运行程序,将输出文件放到HDFS上查看
可以发现,输出改为SequenceFile后,用普通的-cat无法查看,而-text可以
4.2 MultipleInputs多文件格式数据输入源
如果输入数据文件有多种混合类型,可以用MultipleInputs.addInputPath()进行操作。
//多个输入 |
五、数据输入文件的处理
5.1 InputFormat
5.2 Split切片
分片不包含数据本身,而是指向数据的引用。
我们根据上次的JOB提交分析,可以留意到在job提交过程中有一个方法:
javaint maps = this.writeSplits(job, submitJobDir);
单步调试进入该方法
javaprivate int writeSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {
// 得到配置信息
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
// 获取一个新的map
if (jConf.getUseNewMapper()) {
maps = this.writeNewSplits(job, jobSubmitDir);
} else {
maps = this.writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}单步进入this.writeNewSplits方法
javaprivate <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input = (InputFormat)ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
// 这里有一个获取切片的方法,返回一个切片集合
// InputSplit(输入切片):代表了要被单个map处理的数据,每一个map()操作只处理一个输入切片。
List<InputSplit> splits = input.getSplits(job);
T[] array = (InputSplit[])((InputSplit[])splits.toArray(new InputSplit[splits.size()]));
Arrays.sort(array, new JobSubmitter.SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
return array.length;
}单步进入input.getSplits方法
javapublic List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = (new StopWatch()).start();
// this.getFormatMinSplitSize()获取格式化切片最小大小,返回的是一个固定的值:1L
/*
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);
}
可以看到如果没有配置“mapreduce.input.fileinputformat.split.minsize”属性,默认值1L。
去集群上查看一下(输入命令后发现,返回的是0):
[wbw@s201 /home/wbw]$hdfs getconf -confKey mapreduce.input.fileinputformat.split.minsize
0
*/
// 所以minSize一定 >=1
long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
/*
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L);
}
去集群上查看一下(发现没有,那么取默认值9223372036854775807L):
[wbw@s201 /home/wbw]$hdfs getconf -confKey mapreduce.input.fileinputformat.split.maxsize
Configuration mapreduce.input.fileinputformat.split.maxsize is missing.
*/
// 所以maxSize一定 <= Long.Max
long maxSize = getMaxSplitSize(job);
List<InputSplit> splits = new ArrayList();
List<FileStatus> files = this.listStatus(job);
// ...
while(true) {
while(true) {
while(true) {
FileStatus file;
do {
if (!var10.hasNext()) {
// ...
return splits;
}
file = (FileStatus)var10.next();
} while(ignoreDirs && file.isDirectory());
Path path = file.getPath();
long length = file.getLen();
if (length != 0L) {
// ...
// isSplitable:如果是压缩文件,那么不可以切
if (this.isSplitable(job, path)) {
// 获取block块大小
long blockSize = file.getBlockSize();
/*
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
*/
// 计算切片大小:即在块大小、最大(小)切片中取中间值
long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
// ...
splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
}
} else {
splits.add(this.makeSplit(path, 0L, length, new String[0]));
}
}
}
}
}我们修改一下words.txt内容(查看属性,发现有49个字节。其中45个字母,有4个是回车换行):
Codehello world tom 15 + 2(\r\n)
tom hello world 15 + 2
world tom hello 15再修改WordCount类,手动添加指定切片大小(一个块默认128M取中间值后,所以切片大小应该是13):
java// 设置最大最小切片
FileInputFormat.setMaxInputSplitSize(job, 13L);
FileInputFormat.setMinInputSplitSize(job, 1L);接着重新debug程序:
可以看到,文件被切成了4片,每一片大小13。但是如果切13个字节的话,一行文本就被断掉了,数据就有问题了。
可以看到实际上,并没有,依然是一行一条。这就涉及到RecordReader阅读器了。
5.3 RecordReader读取法则
InputSplit描述了数据块的切分方式,RecordReader类则是实际用来加载split分片数据,并把数据转换为适合Mapper类里面map()方法处理的<key, value>形式。
接着 3.3 我们打断点,一步一步进入最后发现到了一个LineRecordReader类中的this.in.readLine方法:
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { |
/* |
结论:
这里有个很重要的类Recordreader,inputformat到map中间有一个reader,你切片可以这样定义(就是代码定义的13),但是reader要去读数据,他会首先判断读取的位置是不是行首,若是,则会一直读到回车换行;若不是行首,它会从下一行开始读。这就是为什么我们第二个切片内容会是一个完整的行内容,而不是从第一行的 om 开始读。 |
那么一行数据,可能在不同的splits中,也可能在不同的block中。 |
六、数据库读取与写入
数据的输入不仅仅可以从常用的TextInputFormat进行,还可以直接从数据库中读取和写入。
6.1 数据库准备
create database big4 ; |
6.2 自定义数据对象类
首先我们回顾一下jdbc程序的读取/写入操作,关键是setXxx()方法和getXxx()方法。
jdbc写操作
javaClass.forName("com.mysql.jdbc.Driver");
Connection conn = DriverMananger.getConnection("jdbc:mysql://localhost:3306/big4","root","root");
PreparedStatement ppst = conn.preparedStatement("insert into test(id,name,age) values(?,?,?)");
//绑定参数
ppst.setInteger(1,1);
ppst.setInteger(2,"tom");
ppst.setInteger(3,12);
ppst.executeUpdate();
ppst.close();
conn.close();jdbc读操作
javaClass.forName("com.mysql.jdbc.Driver");
Connection conn = DriverMananger.getConnection("jdbc:mysql://localhost:3306/big4","root","root");
ppst = conn.preparedStatement("select id,name from test ");
ResultSet rs = ppst.executeQuery();
while(rs.next()){
int id = rs.getInt("id");
String name = rs.getInt("name");
}
rs.close();
conn.close();
然后,我们开始编写自定义数据对象类:
/** |
6.3 编写Mapper和Reducer
Mapper
/** |
Reduce
/** |
6.4 编写App启动类
/** |
6.5 添加pom依赖
<dependency> |
6.6 本地运行
数据写入成功。
6.7 集群运行
因为我们程序用到了mysql驱动类,而Hadoop中没有!所以我们要利用之前的方法,把mysql-connector-java-X.X.X.jar包放到Hadoop的lib目录下。【可以参照1.3内容】
分发jar包到所有节点的lib目录下,因为不知道程序实际运行在哪个节点上。
修改程序中的mysql的url地址,改成本地IP。
Code进入cmd命令界面,输入ipconfig,找到IPV4地址,如我的:192.168.174.1
清空数据库输出表内容。
利用MAVEN打jar包上传。
启动集群,运行jar包。
Codehadoop jar Hadoop-1.0-SNAPSHOT.jar cn.wangbowen.mr.db.DBWCApp
可能会报错不被允许连接数据库,这时候要对本地数据库进行一些设置:
可以参考 https://blog.csdn.net/yang5726685/article/details/52529082
查看WEB-UI 8088端口,发现有一个作业正常运行,等一会查看数据库。发现数据和本地的效果一致。