《HBase原理与实践》学习笔记
一、HBase概述
1.1 HBase数据模型
1.1.1 逻辑视图
- table:表
- row:行
- column:列
- timestamp:时间戳
- cell:单元格
1.1.2 物理视图
HBase中的数据是按照列族存储的,即将数据按照列族分别存储在不同目录中。
1.2 HBase体系结构
1.2.1 Master
主要负责HBase系统的各种管理工作:
- 处理用户的各种管理请求,包括建表、修改表、权限操作、切分表、合并数据等。
- 管理集群中的RegionSerer,包括其中Region的负载均衡、迁移等。
- 清理过期日志以及文件
1.2.2 RegionServer
主要用来相应用户IO请求,是HBase中最核心的模块:
WAL(HLog)
- 用于实现数据的高可靠性,HBase数据随机写入时,并非直接写入HFile数据文件,而是先写入缓存,再异步刷新落盘。为了防止缓存数据丢失,数据写入缓存之前需要首先顺序写入HLog。这样即使缓存数据丢失,也能够通过HLog日志恢复。
- 用于实现HBase集群间的主从复制,通过回放主集群推送过来的HLog日志实现主从复制。
BlockCache
HBase中的读缓存。客户端从磁盘上熟读数据后通常会将数据缓存在系统内存中,后续再次访问相同的一行数据,可以直接从内存中获取,对于大量热点读的业务来说,可以很大提高性能。
BlockCache缓存对象是一系类Block块,默认64K。利用了空间局部性和时间局部性原理,来实现。
Region
数据表的一个分片,当数据表大小超过一定阈值就会“水平切分”,分裂为两个Region。Region是集群负载均衡的基本单位。通常一张表的Region会分布在整个集群的多台RegionServer上,一个RegionServer会管理多个Region。
一个Region由一个或者多个Store构成。Store的个数取决于表中列族的个数,多少个列族就有多少个Store。(每个列族的数据都集中存放在一起形成一个存储单元Store)
每个Store由一个MemStore和多个HFile组成。MemStore成为写缓存,用户写入数据时,会先写到MemStore,当MemStore写满之后(阈值默认为128M),系统会异步将数据flush成一个HFile文件。显然,随着数据不断写入HFile文件越来越多,当HFile文件数超过一定阈值的时候,会执行Compact操作,将小文件通过一定策略合并成一个大文件。
1.3 HBase系统特性
1.3.1 HBase的优点
- 容量巨大:单表支持千亿行、百万列的数据规模。
- 良好的可扩展性:集群扩展容易,主要是数据存储节点的扩展以及读写服务节点扩展。(添加RegionServer节点)
- 稀疏性:允许大量列值为空,并不占用任何存储空间。
- 高性能:主要擅长OLTP场景,数据写操作性能强劲,对于 随机单点读 以及 小范围扫描读 ,其性能也能得到保障。对于大范围的扫描读可以使用MR的API,以便实现更高效的并行扫描。
- 多版本:时间戳,可以保留多个历史版本。
- 支持过期:TTL过期特性,只要设置过期时间,就可以自动清理。
- Hadoop原生支持
1.3.2 HBase的缺点
- HBase本身不支持很复杂的聚合运算(如,Join、GroupBy等)。如果业务中需要使用聚合运算,可以在HBase之上架设Phoenix组件(小规模OLTP)或者Spark组件(大规模聚合的OLTP)。
- HBase本身没有二级索引功能,不支持二级索引查找。好在针对HBase实现的第三方二级索引方案非常丰富,比如目前比较普遍的使用Phoenix提供的二级索引功能。
- HBase原生不支持全局跨行事务,只支持单行事务模型。同样,可以使用Phoenix提供的全局事务模型组件来弥补HBase的这个缺陷。
二、基础数据结构与算法
2.1 跳跃表
跳跃表是一种能高效实现插入、删除、查找的内存数据结构,这些操作期望复杂度都是O(logN)。
2.2 LSM树
LSM树(Log-Strucured Merge-Tree)本质上和B+树一样,是一种磁盘数据的索引结构。但和B+树不同的是,LSM树的索引对写入请求更友好。
LSM树的索引一般由两部分组成:
- 内存部分:采用跳跃表来维护一个有序的KeyValue集合。
- 磁盘部分:由多个内部KeyValue有序的文件组成。
2.2.1 KeyValue存储格式
一般来说,LSM中存储的是多个KeyValue组成的集合,每一个KeyValue一般都会用一个字节数组来表示。
HBase为例,其中,Rowkey、Family、Qualifier(列族下的列)、Timestamp、Type这5个字段组成KeyValue中的key部分(Key二进制内容,表示版本号64位long值,HBase中表现为timestamp,type三个必不可少)。Value部分直接存储这个KeyValue中Value的二进制内容。
其中type字段表示这个KeyValue操作的类型,这表明了LSM树内存储的不只是数据,而是每一次记录。
2.2.2 多路并归
类似归并排序算法一样,来合并多个有序文件成一个大文件。
2.2.3 LSM树的索引结构
分为内存部分和磁盘部分,本质是将写入操作全部转化成磁盘的顺序写入,极大地提高了写入操作的性能。但是对读取操作不友好,因为需要在读取的过程中,通过归并所有文件来读取所对应的KV,非常耗资源。因此HBase中设计了异步compaction来降低文件个数。
2.3布隆过滤器
场景
用来判断一个元素是否存在于一个集合中(当集合很大很大远远超出内存的时候)。
原理
布隆过滤器由一个长度为N的01数组组成。一开始全部初始化为0,然后对集合A中的每个元素进行K次Hash并对长度取模后获得K个下标索引,然后把这些下标对应的数组元素置为1。
现在需要判断w是否存在于集合A中,只要把w用上述方法一样K次Hash,每次的结果去数组里面查找,只要有一个值为0就说明这个元素不存在。如果全部为1,只能说明可能存在(假设x经过3次hash后,下标为1、2、6;y为3、4、5,这时候w的hash结果是1、2、3。虽然都是为1,但是属于2个元素)
N=K*|A|/ln2 ,使用这个公式可以保证最佳的误判率。N为数组长度,K为Hash次数,|A|为集合中元素个数。
HBase与布隆过滤器
在HBase 1.X中有3中类型:
- NONE:不启用
- ROW:按照rowkey来计算布隆过滤器的二进制串并存储。
- ROWCOL:按照rowkey+family+qualifier这3个字段拼出byte[]来计算布隆过滤器值并存储。如果在查询的时候,get能指定这3个字段,那么布隆过滤器肯定能提高性能。但是如果缺少任何一个,则无法提升性能。
腾讯团队介绍了一种设计,他们游戏业务rowkey的设计:
rowkey=< userid>#< other-field>
即按照userid来做前缀扫描。(前缀固定,所以计算布隆过滤器的key值也就固定)
总结
布隆过滤器对Get和基于前缀扫描的Scan都非常友好。
三、HBase依赖服务
3.1 ZooKeeper
关于Zookeeper内容可以参考另一篇文章(ZooKeeper学习笔记)。现在来看一下HBase在ZK上创建的内容:
[zk: localhost:2181(CONNECTED) 1] ls /hbase |
meta-region-server:存储HBase集群元数据hbase:meta元数据表所在的RegionServer访问地址。
master/backup-masters:主/备管理节点,防止单点故障。
table:集群中所有表信息。
splitWAL:分布式故障恢复。
rs:集群中所有运行的RegionServer。
等等。。。
3.2 HDFS
HDFS读写流程,具体详情参考文章(Hadoop学习笔记(二)HDFS)。
HDFS在HBase中扮演的角色
- HBase本身不存储文件,它只规定文件格式以及文件内容,实际文存储件由HDFS实现。
- HBase不提供机制保证存储数据高可靠性,数据的高可靠性由HDFS多副本保证。
3.3 HBase在HDFS中的文件布局
drwxr-xr-x - hadoop supergroup 0 2020-05-29 02:05 /hbase/.hbck |
四、HBase客户端
4.1 客户端与服务端交互流程
获取Configuration对象
一般需要3个配置文件:hbase-size.xml、core-site.xml、hdfs-site.xml放到JVM可以加载到的地方
javaConfiguration conf = HBaseConfiguration.create();
通过Configuration初始化Connection
Connection是HBase客户端一切操作的基础,维持了客户端到整个HBase集群的连接。
例如,集群里有2个Master、5个RegionServer。那么Connection会维持一个到Active Master的TCP和5个到RegionServer的TCP。
通常,一个进程只需要为一个独立的集群建立一个Connection即可,并不需要连接池。
javaConnection conn = ConnectionFactory.createConnection(conf);
通过Connection初始化Table
Table是一个非常轻量级的对象,实现了用户访问表的所有API操作,如:Put、Get、Delete、Scan等。本质上,它使用的连接资源、配置信息、线程池、Meta缓存等,都来自Connection对象,因此由同一个Connection创建的多个Table,都可以共享上述这些资源。
注意:branch-1以及之前版本,Table不是线程安全的类,不建议共享一个Table实例。但是,HBase2.0.0之后的版本,Table已经实现为线程安全类。可以通过同一个Connection为每个请求创建一个Table,但是要记得关闭Table对象。
javaTableName tableName = TableName.valueOf("ns1:t1");
Table table = conn.getTable(tableName);通过Table执行Put和Scan操作
put(这里注意到参数都是byte[]):
java// 通过bytes工具类创建字节数组(将字符串)
byte[] rowId = Bytes.toBytes("row3");
// 创建put对象
Put put = new Put(rowId);
byte[] f1 = Bytes.toBytes("f1");
byte[] id = Bytes.toBytes("id");
byte[] value = Bytes.toBytes(102);
put.addColumn(f1, id, value);
// 执行插入
table.put(put);scan:
javaConfiguration conf = HBaseConfiguration.create();
try (Connection connection = ConnectionFactory.createConnection(conf)) {
try (Table table = connection.getTable(TableName.valueOf("ns1:t1"))) {
// scan设置
Scan scan = new Scan();
try (ResultScanner scanner = table.getScanner(scan)) {
// 每次scanner.next()返回一个结果
for (Result result : scanner) {
// 这里可以看出每一个result中的cell都是相同的rowKey
final String rowKey = Bytes.toString(result.getRow());
System.out.println(rowKey);
// 1个result结果包含N个cells
for (Cell cell : result.listCells()) {
System.out.print("[" +
Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + ":" +
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()) + "]" + "\t"
);
}
System.out.println("------------------------------------------");
}
}
}
}
4.2 定位Meta表
HBase一张表由多个Region构成,这些Region分布在不同RegionServer上。客户端在做任何操作的时候,要先确定数据在哪个Region上,然后根据Region的RegionServer信息,去对应的RegionServer上读取数据。因此,有一张特殊的表(hbase:meta),来存放整个集群所有Region的信息。
hbase:meta表始终只有一个Region,这是为了确保meta表多次操作的原子性,因为hbase只支持Region级别的事务。
4.2.1 Meta表构成
总体来说hbase:meta的一个rowkey就对应一个Region。
rowkey 主要由4部分组成
- TableName(业务表明,带命名空间)
- StarRow(业务表Region区间的起始rowkey)
- Timestamp(Region创建的时间戳)
- EncodedName(上面3个字段的MD5 Hex值)
Codens1:t1,,1592659846948.9c7ccaa30b993abc7d60a2d790289e51.
每一行数据又主要分为4列
info:regioninfo:对应的Value存储4个信息
Codecolumn=info:regioninfo, timestamp=1592659847921, value={
// EncodedName
ENCODED => 9c7ccaa30b993abc7d60a2d790289e51,
// RegionName
NAME => 'ns1:t1,,1592659846948.9c7ccaa30b993abc7d60a2d790289e51.',
// Region的StarRow和StopRow
STARTKEY => '',
ENDKEY => ''
}info:seqnumDuringOpen:存储Region打开时的sequenceId
Codecolumn=info:seqnumDuringOpen, timestamp=1592659847921, value=\x00\x00\x00\x00\x00\x00\x00\x02
info:server:存储Region落在哪个RegionServer上
Codecolumn=info:server, timestamp=1592659847921, value=hadoop001:16020
info:serverstartcode:所在RegionServer的启动时间戳
Codecolumn=info:serverstartcode, timestamp=1592659847921, value=1592658086718
4.2.2 热点Region问题
之前说到所有的操作要确定Region的位置,而这些位置信息都在hbase:meta表上。所有请求都访问这个表,那么会产生热点问题。解决方案就是将Region信息缓存在客户端。
以上就是客户端定位Region示意图,在客户端有一个MetaCache的缓存,客户端调用API时,会先去缓存中找业务rowkey所在的Region,情况可能如下:
- Region信息为空:即缓存中没有这个Region信息,那么要去hbase:meta中查找(当然首次查找,需要到ZK集群获取hbase:meta所在的RegionServer)。然后返回一个二元组(regionStartRow, region)放到MetaCache中。
- Region信息不为空,但是RPC请求对应的RegionServer上并没有该Region信息:说明缓存信息过期了,可能是Region迁移了,这时候就要重新去hbase:meta表找了。
- Region信息不为空,且RPC请求对应的RegionServer上存在该Region信息:这就是大部分情况。
4.3 Scan 操作
常用的有:startRow、endRow、Filter、caching、batch、reversed、maxResultSize、version、timeRange等。
我们已经知道可以通过 table.getScanner(scan) 可以拿到一个scanner,然后不断执行scanner.next()就能拿到一个Result。
通常一个Result内部有多个cell,这些cell的rowkey一样,就相当于一行完整的数据。如果设置了setBatch那么就有可能是一行的一部分数据,即有batch个cell。
4.3.1 客户端读取Result流程
每次执行scanner.next()都会去cache队列中拿一个result(步骤4)
如果cache队列为空,那么发起RPC请求当前scanner后续的result数据(步骤1)
客户端收到result列表后(步骤2)
通过scanResultCache把这些result内的多个cell进行重组,最终组成用户需要的result放入cache中(步骤3)
为什么要重组?因为RPC请求有限制,达到某个资源阈值就会立即返回当前获取到的cell,即可能无法一次拿那么多数据,那么这时候从服务器返回的result就有可能是一行中的部分数据,而我们需要的result是一行完整的数据。这时候就需要对result进行重组,来获得我们期望的数据。
以上步骤1+2+3,称为loadCache。
4.3.2 Scan的几个重要概念
- caching:每次loadCache操作最多存放caching个result到cache队列中。控制caching也就可以控制每次loadCache向服务器请求的数据量,避免出现某一次next()操作耗时极长。
- batch:用户拿到的result中最多含有一行数据中的batch个cell。
- allowPartial:跳过重组过程,直接把收到的服务端result返回给用户。
- maxResultSize:loadCache单次RPC操作最多拿到maxResultSize字节的结果集。
4.4 HBase 客户端避坑指南
4.4.1 RPC重试配置要点(几个超时参数)
- hbase.rpc.timeout:单次RPC请求的超时时间(默认60 000ms)
- hbase.client.retries.number:调用API时最多容许多生多少次RPC重试操作(默认35次)
- hbase.client.pause:连续两次RPC重试之间的休眠时间(默认100ms。注意采用的是退避算法,也就是说重试次数越多,则休眠时间越长)
- hbase.client.operation.timeout:单次API(即get/put/delete等操作)的超时时间(默认1 200 000ms)
4.4.2 Scan Filter 设置
PrefixFilter(前缀过滤器)
会返回前缀为指定内容的rowkey,但是不高效,因为会对(-∞,rowkey)区间的内容全部扫描。解决方法是可以设置一个setStartRow(),RegionServer发现有这个属性会首先寻址定位到这个startRow,然后从这个位置开始扫描数据,这样就跳过了大量数据。
不过,更简单的是直接设置setStartRow和setStopRow,即区间直接定位。这样效率最高。
PageFilter(分页过滤器)
用来限制返回的数据数量的。
注意,HBase里Filter状态全部都是Region内部有效的。如果扫描的数据,一旦从一个Region切换到另一个Region那么之前那个Filter的内部状态就无效了(即计数器清0)。新的Region内部用的是一个新的Filter。
当然,如果想实现分页功能,可以直接通过limit实现:setLimit(1000);
4.4.3 少量写和批量写
- table.put(put):单行数据写入,在服务端先写WAL,然后邪写到MenStore,一旦写满flush到磁盘上。吞吐量受限于磁盘带宽、网络带宽、flush速度。但是保证不会丢数据,保证put的原子性。
- table.put(List< Put> puts):批量写入,在客户端缓存put,凑足一批后打包成一次RPC发送到服务端,一次性写WAL,和MenStore。耗时会长一点,另外如果put分布在多个Region内,可能有一部分会失败(HBase不提供跨Region的多行事务),失败的会重试。
- bulk load:直接将带写入数据生成HFile,直接加载到对应的Region下的CF内。在HBase服务器端没有任何RPC只有load HFile时会调用,是一种完全离线的快速写入方式。
五、RegionServer的核心模块
RegionServer的内部结构,在之前已经了解过了。现在我们来逐个解析。
5.1 HLog
HBase中系统故障恢复及主从复制都基于HLog实现。
5.1.1 HLog文件结构
每个RegionServer拥有一个或多个HLog(默认1个,1.1以后允许多个)。每个HLog是多个Region共享的。
5.1.2 HLog文件存储
在HDFS上的目录:
/hbase/WALs/hadoop001,16020,1592727464395/hadoop001%2C16020%2C1592727464395.1592727473513 |
有3个部分:hadoop001是RegionServer域名,16020端口号,1592727464395目录生成时间戳。
可以使用命令查看内容:hbase hlog
(但是我这个版本好像没有hlog,而是wal)
[hadoop@hadoop002 /home/hadoop/app/hbase-2.2.4/logs]$hbase wal /hbase/WALs/hadoop002,16020,1592729053980/hadoop002%2C16020%2C1592729053980.1592729063117 |
5.1.3 HLog生命周期
HLog构建:HBase任何写入(更新、删除)操作都会先把记录追加写入HLog文件中。
HLog滚动:HBase后台启动一个线程,每隔一段时间(’hbase.regionserver.logroll.period‘决定,默认1小时)进行日志滚动。滚动的主要目的是方便过期日志以文件形式删除。
HLog失效:写入数据一旦从MemStore落盘,那么对应的日志数据就失效了。只要日志文件中所有日志记录都已经落盘,那么该文件失效。一旦失效,被移动到oldWALs文件夹。(注意此时HLog还没有被删除)
HLog删除:Mater后台会启动一个线程,每隔一段时间(’hbase.master.cleaner.interval‘,默认1分钟)检查一次oldWALs文件夹下的所有失效日志文件,确定可以删除后执行删除:(确认条件主要有2个)
该HLog文件是否还在参与主从复制
该HLog文件是否已经在oldWALs目录中存在10分钟(可以通过’hbase.master.logcleaner.ttl‘设置,默认10分钟)
有个奇怪的地方,我的集群内,oldWALs并不会自动清理!?查阅资料后添加参数也没有效果。即,存在一大堆 /hbase/oldWALs/pv2-00000000000000000596.log 这样的文件,它们生成的时间间隔是1小时,可以看出是HLog滚动的1小时。
5.2 MemStore
HBase没有直接使用原始的跳跃表,而是用了JDK自带的ConcurrentSkipListMap,此外这是线程安全的。
MemStore由两个ConcurrentSkipListMap实现,写入操作会先写入A,当A中数据量达到阈值之后,会创建B来接受用户新的请求,而已经写满的A,会执行异步flush落盘形成HFile。
MemStore的GC问题
既然是缓存,那么就会涉及到GC问题。MemStore本身会占用大量内存,因此GC问题不可避免。而且MemStore的工作模式会引起严重的内存碎片问题。
一个RegionServer有多个Region构成,一个Region有多个MemStore。而这些所有的MemStore会共享内存,即数据混合在一起。因此如果有一个Region执行落盘操作,那么会产生大量内存碎片。
MSLAB内存管理方式
为了解决上述内存碎片可能导致的Full GC。借鉴了线程本地分配缓存的内存管理方式:
- 每个MemStore会实例化得到一个MemStoreLAB对象
- MemStoreLAB会申请一个2M大小的Chunk数组,同时维护一个Chunk偏移量,初始为0
- 当一个KeyValue值插入MemStore后,MemStoreLAB会把data数组复制到Chunk数组中,然后移动偏移量
- 当前Chunk满了之后,再申请一个新的Chunk
这样通过整块的方案即将一个Region的数据放到一起,可以很大程度减少内存碎片。但是这还存在一些小问题。
MemStore Chunk Pool
当Chunk写满之后,系统会重新申请一个新的Chunk,新建Chunk对象会再JVM的新生代申请内存,如果申请比较频繁会导致JVM的新生代Eden区满掉,触发YGC。
MemStore Chunk Pool 的思路:
- 系统创建一个Chunk Pool来管理所有未被引用的Chunk,这些Chunk就不会再被JVM当作垃圾回收。
- 如果一个Chunk没有再被引用,将其放入Chunk Pool
- 如果当前Chunk Pool已经达到容量最大值,就不会再接纳新的Chunk
- 如果需要申请新的Chunk来存储KeyValue,首先从Chunk Pool中获取,如果有就重复利用,没有就申请一个新的Chunk。
5.3 HFile
5.3.1 逻辑结构(V2)
HFile文件主要有4个部分:
Scanned Block 部分:表示顺序扫描的时候数据块将会被读取。
这个部分包含3个数据块:
- Data Block:存储用户KeyValue数据
- Leaf Index Block:存储索引树的叶子节点数据(索引树有3层,这是最后一层)
- Bloom Block:存储布隆过滤器相关数据
Non-scanned Block 部分:再HFile顺序扫描时候数据不会被读取
主要包括两部分:
- Meta Block
- Intermediate Level Data Index Blocks(索引树第二层)
Load-on-open 部分:这部分数据会在RegionServer打开HFile时直接加载到内存中
包括:
- FlieInfo:固定长度数据块,主要记录文件的一些统计元信息,比较重要的是AVG_KEY_LEN和AVG_VALUE_LEN分别表示平均Key和Value的长度
- 布隆过滤器MetaBlock:记录布隆过滤器相关元数据信息
- RootDataIndex(索引树第一层,即索引树根节点信息)
- Meta Index Block
Trailer 部分:主要记录了HFile的版本信息、其他各个部分的偏移值和寻址信息。
5.3.2 物理结构
HFile文件由各种不同类型的Block(数据块)构成,虽然这些Block的类型不一样,但是数据结构相同。
Block的大小可以创建列族的时候加上参数 blocksize=>’65535’ 指定,默认是64K。大号Block有利于大规模顺序扫描,而小号Block有利于随机查找。
HFileBlock 结构
HFileBlock主要包含两部分:
5.3.3 HFile基础Block
1) Trailer Block
主要记录了HFile的版本信息、各个部分的偏移量和寻址地址信息。
RegionServer在打开HFile时,会加载所有HFile的Trailer部分以及load-on-open部分到内存中。实际加载过程首先会解析Trailer Block,然后进一布加载load-on-open。具体步骤如下:
- 加载 HFile version 版本信息,包含majorVersion和minorVersion(主版本V1、V2还是V3,次版本信息),不同版本使用不同的文件解析器对HFile进行读取解析。
- HBase根据version信息计算Trailer Block大小,加载整个HFile Trailer Block到内存中。其中有很多信息,如上图。
- 然后根据其中两个重要字段:LoadOnOpenDataOffset和LoadOnOpenDataSize,前者表示偏移量,后者表示大小。HBase会在启动后讲load-on-open部分的数据全部加载到内存中。
2) Data Block
Bata Block 是 HBase中文件读取的最小单元。Data Block中存储用户的KeyValue(HBase存储的核心),HBase中所有数据都是以KeyValue结构存储在HBase中。
KeyValue结构:
其中key length 和 value length 是两个固定长度的数值。value是实际写入的数据。
key由多个部分组成,从上图可以看出有各个部分。其中KeyType类型有4种:Put、Delete、DeleteColumn、DeleteFamily
从这里可以看出,每个KeyValue都包含rowkey、column family、column qulifier。因此在表结构设计时,这些字段尽可能设置短的原因。
3) 布隆过滤器相关Block
布隆过滤器之前说过了。可以想象HFile文件越大,里面KeyValue存储的越多,那么位数组就相应越大,太大就不适合加载到内存了。因此HFile V2 在设计上将位数组进行拆分成多个独立的位数组(根据Key拆分,一部分连续JKey使用一个位数组)。这样根据Key查询的时候,只要加载相应的位数组,减少内存开销。
在文件结构上多个位数组对应多个Bloom Block(数组+firstkey),为了方便定位对应的位数组,V2又设计了Bloom Index Block:
HFile种仅有一个Bloom Index Block数据块,位于load-on-open部分。如上图,从大方向上可以分为2部分:一个是HFile种布隆过滤器元数据基本信息(绿色),以及指向Bloom Block的索引项(红色)。
其中Block Key(图上显示FirstKey)是个非常关键的字段,表示该Index Entry指向的Bloom Block种第一个KV的Key值。BlockOffset表示对应Bloom Block在HFile中的偏移量。
get请求根据布隆过滤器查找流程:
- 首先根据带查找Key在Bloom Index Block所有的索引项中根据BlockKey进行二分查找。定位到对应的Bloom Index Entry。
- 再根据Bloom Index Entry中BlockOffset以及BlockOndiskSize加载该Key对应的位数组
- 对Key进行Hash映射,根据映射结果再位数组中查看是否都是1,如果不是,表示不存在Key,否则有可能存在。
4) 索引相关Block
根据索引层级不同,HFile中索引结构分为两种:single-level和mutil-level,前者表示单层索引,后者表示多级索引,一般为两级或者三级。
V2版本的Index Block有两类:
Root Index Block(索引数根节点,属于load-on-open部分)
如图为多层索引的结构(单层索引,仅仅缺少mid的3个字段)。其中Index Entry为具体的索引对象。由3个字段组成:索引指向的DataBlock偏移量、DataBlock在磁盘上的大小、索引指向DataBlock中第一个的Key。
除此之外的MidKey相关信息,用于在对HFile进行split操作时,快速定位HFile的切分点位置。
NonRoot Index Block
- Intermediate Index Block(中间节点,属于Non-Scanned block 部分)
- Leaf Index Block(叶子节点,直接指向实际DataBlock。属于scanned block部分)
一开始,由于HFile刚开始数量小,索引采用单层索引,只有Root Index一层索引,直接指向Data Block。当数据变大的时候,Root Index Block大小超过阈值后,索引分裂为多级结构,由一层索引变为两层,根节点指向叶子节点,叶子节点指向实际Data Block。如果数据更大,索引层变为三层。
NonRootIndexBlock 结构
不管是中间还是叶子节点,结构都是一样的。和根索引块不同的是多了一个内部索引(entry offset),表示index Entry相对于第一个Index Entry的偏移量。用于实现二分查找,加快速度。
5.3.4 HFile文件查看工具
$hbase hfile |
示例:通过shell命令来查看内容,这里引用6.2.2生成的HFile来进行查看:
$hbase hfile -m -f hdfs://mycluster/MRData/out/f1/5314cd3465004280a7ad779dbcbe6a2f |
5.4 BlockCache
BlockCache是RegionServer级别的,一个RegionServer只有一个BlockCache,在RegionServer启动的时候完成初始化工作。目前为止有3种实现方案:
- LRUBlockCache(默认):将数据放入JVM Heap中
- SlabCache:部分数据存放在堆外,可以缓解系统长事件GC
- BucketCache:同上
六、HBase读写流程
6.1 HBase写入流程
6.1.1 客户端处理阶段
用户提交put请求后,HBase客户端会将写入的数据添加到本地缓冲区中,符合一定条件(阈值大小默认2M)就会通过AsyncProcess异步批量提交。
注意!HBase默认设置autoflush为true,表示put请求会直接提交给服务器处理。将autoflush设置成false后,可以极大提升写入吞吐量,但是没有保护机制,如果客户端崩溃,会导致已经提交的数据丢失。
在提交前,HBase会在元数据表hbase:meta中根据rowkey找到它们归属的RegionServer。如果是批量请求,还会把这些rowkey按照HRegionLocation分组,不同分组的请求意味着发送到不同的RegionServer,因此每个分组对应一次RPC请求。
- 首先客户端根据rowkey在元数据缓存中查找,如果能找到该rowkey所在的RegionServer和Region,就可以直接发送写入请求(携带Region信息)到目标RegionServer
- 如果客户端缓存没有找到对应信息,那么就要去ZK集群获取hbase:mate表所在的RegionServer,然后向该RegionServer发送查询请求,在hbase:meta表中查找rowkey所在的RegionServer以及Region信息,并将结果缓存在客户端。
- 客户端根据获得的相关元数据信息将写入请求发送给目标RegionServer。
6.1.2 Region 写入阶段
服务端RegionServer接受到客户端的写入请求后,首先会反序列化为put对象,然后执行各种检查。检查后,执行一系列核心操作:
Acquire locks:HBase中使用行锁保证对同一行数据的更新原子性。
Update LATEST_TIMESTAMP timestamp:更新所有待写入(更新)KeyValue的时间戳为当前系统时间。
Build WAL edit:在内存中构建WALEdit对象。(注意!这时候还没有写到HLog)
Append WALEdit To WAL:将步骤3中构造的在内存中的WALEdit记录顺序写入HLog中,此时不需要sync(同步到HDFS)操作。
1. HLog持久化等级
- SKIP_WAL:只写缓存,不写HLog。有数据丢失风险。
- ASYNC_WAL:异步将数据写入HLog日志中。
- SYNC_WAL:同步将数据写入日志文件中,注意数据只是写入文件系统,并没有真正落盘。还要看HDFS Flush策略。
- FSYNC_WAL:同步将数据写入日志文件并强制落盘。
- USER_DEFAULT:默认使用SYNC_WAL
2. HLog写入模型
HLog写入都要经过3个阶段:首先将数据写入本地缓存,然后将本地缓存写入文件系统,最后执行sync操作同步到磁盘。
当前版本使用了LMAX Disruptor框架实现了无锁有界队列操作。
Write back to MemStore:写入WAL之后,再将数据写入MemStore。
MemStore使用数据结构ConcurrentSkipListMap来实际存储KeyValue。
根据之前的知识可以知道HBase使用了MSLAB机制,预先申请一个大的(2M)Chunk内存,避免严重的内存碎片问题而触发的Full GC。写入的KeyValue会进行一次封装,顺序拷贝到这个Chunk中。因此MemStore写入过程可以归为3步:
- 检查当前Chunk是否写满,如果写满,重新申请一个2M的Chunk
- 将当前KeyValue在内存中重新构建,在可用Chunk的指定offset处申请内存创建一个新的KeyValue
- 将新创建的KeyValue对象写入ConcurrentSkipListMap中。
Release row locks:释放行锁。
Sync wal:HLog真正sync到HDFS,再释放行锁之后执行sync操作是为了尽量减少持锁时间,提升写性能。如果sync失败,执行回滚操作将MemStore中已经写入的数据移除。
结束写事务:此时该线程的更新操作才会对其他读请求可见,更新才实际生效。
6.1.3 MemStore Flush 阶段
当MemStore越来越大,达到一定的条件的时候,就会触发flush操作:
1) 触发条件
MemStore级别限制:Region中任意一个MemStore大小达到上限(hbase.hregion.memstore.flush.size,默认128MB)
Region级别限制:当Region中所有MemStore的大小总和达到上限(hbase.hregion.memstore.block.multiplier * hbase.hregion.memstore.flush.size)
RegionServer级别限制:当RegionServer中MemStore的大小总和超过低水位阈值(hbase.regionserver.global.memstore.size.lower.limit * hbase.regionserver.global.memstore.size)开始强制执行flush,先flush MemStore最大的Region,再flush次大的。
如果此时写入吞吐量依然很高,导致总MemStore大小超过高水位阈值hbase.regionserver.global.memstore.size,RegionServer会阻塞更新并强制执行flush,直至总MemStore大小下降到低水位阈值。
当一个RegionServer中HLog数量达到上限(hbase.regionserver.maxlogs)
HBase定期刷新MemStore(默认周期1小时,确保MemStore不会长时间没有持久化)
手动执行flush(可以通过shell命令:flush ‘tablename’ 或者 flush ‘regionname’分别对表和Region进行flush)
一般情况下flush不会对系统有太大的影响,只有RegionServe级别的会阻塞所有落在该RegionServer上的写入操作。直到MemStore数据量降低到配置阈值内。
2) 执行流程
为了减少flush过程对读写的影响(写数据的时候会写到MemStore,而再flush的时候数据不能改变),HBase采用了类似于两个阶段提交的方式,将flush过程分为3个阶段:
prepare阶段:遍历当前Region中所有的MemStore,将MemStore中当前数据集CellSkipListSet(内部实现采用ConcurrentSkipListMap)做一个快照snapshot,然后再新建一个CellSkipListSet接受新的数据写入(即5.2中说的AB两个跳跃表)。
flush阶段:遍历所有MemStore,将prepare阶段生成的snapshot持久化为临时文件(第3点介绍具体流程),这个过程涉及磁盘IO操作,因此相对耗时。
commit阶段:遍历所有的MemStore,将flush阶段生成的临时文件移到指定的ColumnFamily目录下(应该就是HDFS目录 /hbase/data/命名空间/表名/region名/列族名 下),针对HFile生成对应的storeflie(实际上StoreFile就是对HFile做了轻量级包装,即StoreFile底层就是HFile)和Reader,把storefile添加到Store的storefiles列表中,最后再清空prepare阶段生成的snapshot。
3) HFile 文件构建流程
HBase执行flush操作之后将内存中的数据按照特定格式写成HFile文件,HFile文件中各个Block部分的构建流程如下:(文件结构在5.3中已经说明)
构建“Scanned Block”部分
MemStore执行flush,首先新建一个Scanner,这个Scanner从存储KV数据的CellSkipListSet中依次从小到大读出每个cell(KeyValue)。
appendGrneralBloomFilter:在内存中使用布隆过滤器算法构建Bloom Block,下文也称为(Bloom Chunk)。
appendDeleteFamilyBloomFilter:针对标记为”DeleteFamily”或者”DeleteFamilyVersion”的cell,在内存中使用布隆过滤器算法构建Bloom Block,基本流程和appendGrneralBloomFilter相同。
(HFile.Write)writer.append:将cell写入Data Block中,这是HFile构建的核心。
构建 Bloom Block
布隆过滤器内存中维护了多个称为chunk(即Block,具体实现叫Chunk)的数据结构,一个chunk的组成:
- 一块连续的内存区域,主要存储一个特定的数组。默认数组中所有位都是0。对于row类型的布隆过滤器,cell进来之后会对其rowkey执行hash映射,将其映射到位数组某一位,并修改值为1。
- fistkey,第一个写入该chunk的cell的rowkey,用来构建Bloom Index Block(可以看5.3.3第3点图中左边红色部分)。
cell写进来之后,首先判断当前chunk是否已经写满(chunk个数是否超过阈值)。如果超过阈值,会重新申请一个新的chunk,并将当前chunk加入ready chunks集合中。如果没有写满,则根据布隆过滤器算法使用多个hash函数分别对cell的rowkey进行映射,并修改值为1。
构建 Data Block
一个cell在内存中生成对应的布隆过滤器信息(即,位数组置1)之后就会写入DataBlock,写入过程分两步:
Encoding Key Value:使用特定编码对cell进行编码处理。
编码思路:根据上一个KeyValue和当前KeyValue比较之后取delta(即,分别对rowkey、column family、 column进行比较然后取delta)。假如前后两个前后两个KeyValue的rowkey相同,当前rowkey就可以用一个特定的flag标记,不需要完整地存储整个rowkey。这样在某些场景下可以极大减少存储空间。
将编码后的KeyValue写入DataOutputStream
随着cell不断写入,当前Data Block会因为大小超过阈值(默认64KB)而写满。写满后Data Block会将DataOutputStream的数据flush到文件,该Data Block此时完成落盘。
构建 Leaf Index Block
Data Block落盘之后会立刻在内存中构建一个Leaf Index Entry对象,并加入到当前Leaf Index Block中。Leaf Index Entry对象有三个重要字段:
- firstKey:落盘Data Block的第一个key。用来作为索引节点的实际内容。
- blockOffset:落盘Data Block在HFile文件中的偏移量。用于定位Data Block。
- blockDataSize:落盘Data Block的大小。用于定位之后加载数据。
同样的,Leaf Index Block随着Entry的不断写入慢慢变大,一旦超过阈值(64KB),就要flush到文件执行落盘。需要注意的是,Leaf Index Block落盘是追加写入文件的,所以会形成HFile中Data Block和Leaf Index Block交叉出现的原因。
和Data Block一样,在Leaf Index Block落盘之后,还需要往上构建Root Index Entry并写入Root Index Block,形成索引树的根节点。但是根节点没有追加写入到”Scanned block”部分,而是最后写入”load-on-open”。
构建 Bloom Block Index
完成Data Block落盘还有一件非常重要的事情:检查是否有已经写满的Bloom Block。如果有,将该Bloom Block追加写入文件(在第二步中只是把chunk加到一个集合中),在内存中构建一个Bloom Index Entry并写入Bloom Index Block。
基本流程总结
flush阶段生成的HFile和Compaction阶段生成的HFile流程完全相同,不同的是flush读取的MemStore中的KeyValue,而Compaction读取的是多个HFile中的KeyValue写成一个大的HFile,即KeyValue来源不同。
首先从MemStore中获取KeyValue,然后根据KeyValue通过布隆过滤器算法生成Bloom Block加到ready chunks集合中,然后写入Data Block。一旦Data Block写满,就要将其落盘,同时构造一个索引Leaf Index Entry 加到 Leaf Index Block中,直到写满落盘(追加到Data Block之后)时,再构造一个索引Root Index Entry 加到 Root Index Block(后期写入”load-on-open”)。紧接着Data Block落盘,在raedy chunks中的chunk也要落盘(即,Bloom Block落盘),也是追加写入的。然后构建该Bloom Block的索引 Bloom Index Entry 加到 Bloom Index Block。
实际上,每写入一个KeyValue就会动态的去构建“Scanned Block”部分,等所有KeyValue都写入完成之后再再静态地构建“Non-scanned Block”部分、“Load on open”部分以及“Trailer”部分。
6.2 BulkLoad 功能
有这么一个场景:
用户数据位于HDFS中,业务需要定期将这部分数据海量导入HBase系统,以执行随机查询更新操作。这种场景如果调用API操作的话,会给RegionServer带来极大的压力。
- RegionServer频繁flush,不断compact、split。影响稳定性。
- RegionServer频繁GC。
- 消耗大量CPU、带宽、内存、IO资源,与其他资源产生竞争。
- 某些场景下,比如平均KV大小比较大,会耗尽RegionServer的处理线程,导致集群阻塞。
在4.4.3中提到过。bulk load:直接将带写入数据生成HFile,直接加载到对应的Region下的CF内。在HBase服务器端没有任何RPC只有load HFile时会调用,是一种完全离线的快速写入方式。
6.2.1 核心流程
HFile生成阶段
这个阶段会运行一个MapReduce任务,mapper需要自己来实现。将HDFS文件中的数据读出来组装成一个复合KV,其中Key是rorkey,Value可以是KeyValue对象、Put对象甚至是Delete对象;reducer由HBase负责,通过HFileOutputFormat2.configureIncrementalLoad()进行配置,这个方法负责:
- 根据表信息配置一个全局有序的partitioner
- 将partitioner文件上传到HDFS集群并写入DistributedCache
- 设置 reduce task 的个数为目标Region的个数
- 设置输出key/value类满足HFileOutputFormat所规定的格式要求
- 根据类型设置reducer执行相应的排序(KeyValueSortReducer或者PutSortReducer)
这个阶段会为每个Region生成一个对应的HFile文件
HFile导入阶段
在HFile准备就绪后,就可以使用工具completebulkload将HFile加载到在线HBase集群。completebulkload工具负责:
- 依次检查第一步生成的所有HFile文件,将每个文件映射到对应的Region
- 将HFile文件移动到对应Region所在的HDFS文件目录下
- 告知Region对应的RegionServer,加载HFile文件对外提供服务
6.2.2 基础案例
生成HDFS上的数据源
BulkLoad的数据源一定是在HDFS上的文件!如果是其他地方的数据要转换成HDFS上的文件。
Code// 首先我们创建一个文件上传到HDFS中
[BulkLoadData.txt]
100 zhangsan 16
101 wangwu 11
102 lisi 30
$hdfs dfs -mkdir /MRData
$hdfs dfs -put BulkLoadData.txt /MRData/BulkLoadData.txt
$hdfs dfs -ls /MRData/
Found 1 items
-rw-r--r-- 3 hadoop supergroup 42 2020-06-22 22:08 /MRData/BulkLoadData.txt使用MR Job Driver程序将源文件转化为HFile
javaimport org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
/**
* BulkLoadDemo class
*
* @author BoWenWang
* @date 2020/6/22 22:12
*/
public class BulkLoadDemo {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取运行参数(其中至少一个输入路径以及一个输出路径)
Configuration conf = HBaseConfiguration.create();
String[] remainingArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if (remainingArgs.length < 2) {
System.err.println("Usage: BulkLoadDemo <in> <out>");
System.exit(2);
}
// HBase
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf("ns1:t1"));
// 创建Job任务
Job job = Job.getInstance(conf, "BulkLoadDemo");
// 设置Jar包运行的类
job.setJarByClass(BulkLoadDemo.class);
// (核心)设置Map
job.setMapperClass(BulkLoadMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
// (核心)通过HFileOutputFormat2.configureIncrementalLoad对Reduce进行配置
HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("ns1:t1")));
// 设置输入输出数据类型
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
// 指定数据输入、输出路径
for (int i = 0; i < remainingArgs.length - 1; i++) {
FileInputFormat.addInputPath(job, new Path(remainingArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(remainingArgs[remainingArgs.length - 1]));
// 将job提交给集群运行
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
/**
* Mapper程序
*/
public static class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
public static final byte[] ID_QUALIFIER = "id".getBytes();
public static final byte[] NAME_QUALIFIER = "name".getBytes();
public static final byte[] AGE_QUALIFIER = "age".getBytes();
public static final byte[] COLUMN_FAMILY = "f1".getBytes();
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(" ");
byte[] rowKey = ("row" + fields[0]).getBytes();
byte[] id = fields[0].getBytes();
byte[] name = fields[1].getBytes();
byte[] age = fields[2].getBytes();
// 存储的数据
final ImmutableBytesWritable putRowKey = new ImmutableBytesWritable(rowKey);
Put put = new Put(rowKey).addColumn(COLUMN_FAMILY, ID_QUALIFIER, id)
.addColumn(COLUMN_FAMILY, NAME_QUALIFIER, name)
.addColumn(COLUMN_FAMILY, AGE_QUALIFIER, age);
context.write(putRowKey, put);
}
}
}然后打包jar到集群上运行:
Code$hadoop jar HBase2-1.0-SNAPSHOT.jar BulkLoadDemo hdfs://mycluster/MRData hdfs://mycluster/MRData/out
然后查看结果:
Code$hdfs dfs -ls -R /MRData
-rw-r--r-- 3 hadoop supergroup 42 2020-06-22 22:08 /MRData/BulkLoadData.txt
drwxr-xr-x - hadoop supergroup 0 2020-06-22 23:50 /MRData/out
-rw-r--r-- 3 hadoop supergroup 0 2020-06-22 23:50 /MRData/out/_SUCCESS
drwxr-xr-x - hadoop supergroup 0 2020-06-22 23:50 /MRData/out/f1
-rw-r--r-- 3 hadoop supergroup 5319 2020-06-22 23:50 /MRData/out/f1/5314cd3465004280a7ad779dbcbe6a2f
注意!可能会报错NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration,原因是没有导入hbase相关的包。因此要在hadoop-env.sh中添加如下内容:
Code
# 改成你自己对应的hbase目录
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/hadoop/apps/hbase/lib/*
将生成的HFile加载到HBase
Code$hadoop jar hbase-mapreduce-2.2.4.jar completebulkload hdfs://mycluster/MRData/out ns1:t1
这里要注意!可能是书上使用的HBase和我使用的不一样,导致命令失效,这个是采用HBase官网上找的命令。为此写了一篇博文:https://blog.csdn.net/weixin_42167895/article/details/106913686
原书上命令如下:
Code$ bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>
另一种是
CodeHADOOP_CLASSPATH='${HBASE_HOME}/bin/hbase classpath' ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-server-VERSION.jar completebulkload <hdfs://storefileoutput> <tablename>
scan 查看表内容确实导入成功
6.3 HBase读取流程
6.3.1 Client-Server读取交互逻辑
和6.1写入阶段客户端处理方式一样。Client会首先从ZooKeeper中获取元数据hbase:meta表所在的RegionServer,然后根据待读写rowkey发送请求到元数据所在RegionServer,获取数据所在的目标RegionServer和Region(并存储到本地缓存),最后将请求进行封装发送到目标RegionServer进行处理。(参考6.1.1和4.3.1 Scan流程)
6.3.2 Server端Scan框架体系
从宏观角度看,依次Scan可以能会扫描一张表的多个Region,对于这种扫描,客户端会根据hbase:meta元数据将扫描的起始区间进行划分,切分成多个相互独立的查询子区间,每个子区间对应一个Regin。
比如:当前表有3个Region,区间分别为[“a”, “c”),[“c”, “e”),[“e”, “g”),客户端设置的scan扫描区间为[“b”, “f”)。因为扫描跨越了多个Region,所以需要进行切分,切分后的区间[“b”, “c”),[“c”, “e”),[“e”, “f”)
HBase种每个Region都是一个独立的存储引擎,因此客户端可以将每个子区间请求分别发送给对应的Region进行处理。
RegionServer接收到客户端的get/scan请求后:
1) 构建Scanner Iterator体系
- 一个RegionScanner由多个StoreScanner构成。一张表由多少个列族组成,就有多少个StoreScanner,每个StoreScanner负责对应Store的数据查找。(即图上步骤1)
- 一个StoreScanner由MemStore和StoreFileScanner构成。每个Store的数据由内存中的MemStore和磁盘上的StoreFile文件组成。因此分别为两者创建Scanner。(步骤2)
注意!RegionScanner和StoreScanner并不负责实际查找操作,他们更多是承当组织调度任务,负责KeyValue最终查找的操作的StoreFileScanner和MemStoreScanner。
2) 过滤淘汰部分部不满足查询条件的Scanner
StoreCanner为每一个HFile构建一个对应的StoreFileScanner,需要注意的是,并不是每一个HFile都包含用户想要查找的KeyValue。相反,可以用过一些查询条件过滤掉很多肯定不存在待查找KeyValue的HFile(步骤3,其中StoreFile3检查不通过而被过滤)。主要过滤策略有:
Rowkey Range过滤
因为StoreFile中所有KeyValue数据都是有序排列的,所以如果待检索范围[startkey, stopkey]与文件起始key范围[firstkey, lastkey]没有交集,如stoprow<firstkey或者startkey>lastkey,就可以过滤掉该StoreFile。
Time Range过滤
StoreFile中元数据有一个关于该File的TimeRange属性,和上面一样,如果范围没有交集,过滤。
布隆过滤器
在5.3.3第三点和6.1.3第三点中已经知道了结构。系统根据待检索的rowkey获取对应的Bloom Block并加载到内存(通常情况下热点Bloom Block会常驻内存),再用hash函数对待检索rowkey进行hash,根据hash后的结果再布隆过滤器数据中进行寻址,即可确定待检索rowkey是否一定不存在于该HFile中。
3) 每个Scanner seek 到 startKey
在每个HFile文件(或MemStoer)中seek扫描起始点startKey。如果没有HFile中没有找到startKey,则seek下一个KeyValue地址。(步骤4)
在一个HFile文件中seek待查找的key,该过程可以分解为4步操作:
根据HFile索引树定位目标Block
HRegionServer打开HFile时会将所有HFile的Trailer部分和Load-on-open部分加载到内存。其中Load-on-open部分有一个非常重要的Root Index Block,索引树的根节点。
BlockKey是整个Block的第一个rowkey。(索引树流程图在5.3.3 第4点,红虚线箭头表示一次查询索引过程。第一次根据rowkey二分查找,找到区间的第一个rowkey,这个root index block常驻内存所以很快,然后根据偏移量和大小,把对应中间节点索引Block载到内存。然后在该Block中通过二分查找定位范围,指向叶子节点索引Block加载到内存,同样的再进行定位,最后指向Data Block,加载到内存。而Data Block中存放的都是KeyValue,所以通过遍历的方式找到对应的KeyValue,完成seek。)
这里我有一个理解(重点):就是在上面过滤完Scanenr后,每一个Scanner对应一个HFile。首先它会吧HFile的Trailer部分和Load-on-open部分加载到内存,以便获取到索引树。然后根据索引树去查找rowkey所在的Data Block,获取对应的BlockOffset和BlockDataSize。然后利用这些信息先去BlockCache里面看有没有DataBlock的缓存,如果有直接用(即下面的第二点)。如果没有就要去HDFS文件中找(即下面的第三点,因为数据都是放在HDFS上的),找到后加载到内存并缓存到BlockCache里。然后从Block中遍历KeyValue(即下面第四点)。因此第一点是主流程,剩下的3点只是条件分支,下面第3点有一个总体的流程,和这个描述差不多。
BlockCache 中检索目标Block
Block缓存到BlockCache之后会构建一个Map,Map的Key是BlockKey,Value是Block在内存中的地址。其中BlockKey由两部分构成——HFile名称以及Block在HFile中的偏移量。很显然,BlockKey是全局唯一的。根据BlockKey可以获取该Block在BlockCache中内存位置(即通过Map),然后直接加载出该Block对象。如果Block在BlockCache中没有找到待查Block,就需要去HDFS文件中查找(即,缓存中没有就要去磁盘上找)。
HDFS文件中检索目标Block
在上文中说到根据文件索引提供的BlockOffset以及BlockDataSize这两个元素可以在HDSF上读取到对应的DataBlock内容。这个阶段HBase下发命令给HDFS,HDFS执行真正的Data Block查找工作。
HBase阶段
- 根据HFile中Block索引信息定位给定KV所在的DataBlock的基本信息:BlockOffset和BlockDataSize
- 根据HFile以及BlockOffset和BlockDataSize在BlockCache中查找是否存在,如果在直接返回,否则在HDFS中进行文件查找
- 调用FSDataInputStream(HFile文件打开后HBase就会为该文件建议一个输入流,后续该文件的所有读操作都会使用这个全局的FSDataInputStream),返回该文件偏移量为BlcokOffset,数据量为BlockDataSize大小的数据
HDFS-NameNode阶段
NameNode根据文件返回属于该文件的所有HDFS Block列表,客户端(这里的客户端是指HBase客户端,调用HDFS的API进行读取数据)可以将HDFSBlock列表进行缓存。
这里要说明一下,在HDFS中,文件被切分为多个Block(这里可以理解为一个HFile被分割成很多个Block存在磁盘上)。这里的HDFSBlock列表,也就相当于是多个HDFSBlock组合成一个HFile。
遍历HDFSBlock列表,根据HDFSBlock元数据以及HBaseBlockOffset、HBaseBlockDataSize确定待查找HBase Block所属的HDFSBlock。
因为HFile中的Block存在于HDFSBlock列表中的部分HDFSBlock上,并不是所有HDFSBlock都有需要的数据。
NameNode上会存储数据文件与这些HDFSBlock的对应关系。
定位到HDFSBlock之后,再在NameNode中查找HDFSBlock都在哪些DateNode上,根据一定的规则选择最优的DateNode(本地DN最优)
NameNode存放的只是元数据信息,真正的数据是放在DataNode上的。
HDFS-DataNode阶段
同选定DN建立通信,传进HDFSBlockId、数据在该Block中偏移量以及数据量大小
DN会在该HDFSBlock中seek到指定偏移量,并从磁盘读取指定大小的数据返回。
注意:ND不会加载出整个HDFSBlock数据(128M)
磁盘阶段
HDFS读取磁盘数据是按照磁盘最小IO单位(4K)进行读取,直至读取出完整的64K数据。
从Block中读取待查找KeyValue
HFile Block由KeyValue(由小到大依次存储)构成,但这些KeyValue并不是固定长度的,只能遍历扫描查找。
4) KeyValueScanner合并构建最小堆
将该Store中的所有StoreFileScanner和MemStoreScanner合并形成一个heap(最小堆)。最小堆管理Scanner可以保证取出来的KeyValue都是最小的,保证有序。(步骤5)
5) 执行next函数获取KeyValue并对其进行条件过滤
经过上述Scanner体系的构建后,此时从小到大的KeyValue已经可以由KeyValueScanner获得,但是还要进一步检查,以及是否满足用户设定的过滤条件:
- 检查该KeyValue的KeyType是否是Deleted/DeletedColumn/DeletedFamily等,如果是,则直接忽略该列所有其他版本,跳到下一列(列族)。
- 检查该KeyValue的Timastamp是否在用户设定的Timestamp Range范围。
- 检查该KeyValue是否满足用户设置的各种filer过滤器。
- 检查该KeyValue是否满足用户查询中设定的版本数
过滤完之后,返回给用户数据。每次返回多少数据还要看用户设置的参数。可以看4.3.1的内容。
七、Compaction 实现
待续。。。