avatar

目录
《HBase原理与实践》学习笔记

《HBase原理与实践》学习笔记

一、HBase概述

1.1 HBase数据模型

1.1.1 逻辑视图

  • table:表
  • row:行
  • column:列
  • timestamp:时间戳
  • cell:单元格

1.1.2 物理视图

HBase中的数据是按照列族存储的,即将数据按照列族分别存储在不同目录中。

1.2 HBase体系结构

avatar

1.2.1 Master

主要负责HBase系统的各种管理工作:

  • 处理用户的各种管理请求,包括建表、修改表、权限操作、切分表、合并数据等。
  • 管理集群中的RegionSerer,包括其中Region的负载均衡、迁移等。
  • 清理过期日志以及文件

1.2.2 RegionServer

主要用来相应用户IO请求,是HBase中最核心的模块:

  • WAL(HLog)

    1. 用于实现数据的高可靠性,HBase数据随机写入时,并非直接写入HFile数据文件,而是先写入缓存,再异步刷新落盘。为了防止缓存数据丢失,数据写入缓存之前需要首先顺序写入HLog。这样即使缓存数据丢失,也能够通过HLog日志恢复。
    2. 用于实现HBase集群间的主从复制,通过回放主集群推送过来的HLog日志实现主从复制。
  • BlockCache

    HBase中的读缓存。客户端从磁盘上熟读数据后通常会将数据缓存在系统内存中,后续再次访问相同的一行数据,可以直接从内存中获取,对于大量热点读的业务来说,可以很大提高性能。

    BlockCache缓存对象是一系类Block块,默认64K。利用了空间局部性和时间局部性原理,来实现。

  • Region

    数据表的一个分片,当数据表大小超过一定阈值就会“水平切分”,分裂为两个Region。Region是集群负载均衡的基本单位。通常一张表的Region会分布在整个集群的多台RegionServer上,一个RegionServer会管理多个Region。

    一个Region由一个或者多个Store构成。Store的个数取决于表中列族的个数,多少个列族就有多少个Store。(每个列族的数据都集中存放在一起形成一个存储单元Store)

    每个Store由一个MemStore和多个HFile组成。MemStore成为写缓存,用户写入数据时,会先写到MemStore,当MemStore写满之后(阈值默认为128M),系统会异步将数据flush成一个HFile文件。显然,随着数据不断写入HFile文件越来越多,当HFile文件数超过一定阈值的时候,会执行Compact操作,将小文件通过一定策略合并成一个大文件。

1.3 HBase系统特性

1.3.1 HBase的优点

  • 容量巨大:单表支持千亿行、百万列的数据规模。
  • 良好的可扩展性:集群扩展容易,主要是数据存储节点的扩展以及读写服务节点扩展。(添加RegionServer节点)
  • 稀疏性:允许大量列值为空,并不占用任何存储空间。
  • 高性能:主要擅长OLTP场景,数据写操作性能强劲,对于 随机单点读 以及 小范围扫描读 ,其性能也能得到保障。对于大范围的扫描读可以使用MR的API,以便实现更高效的并行扫描。
  • 多版本:时间戳,可以保留多个历史版本。
  • 支持过期:TTL过期特性,只要设置过期时间,就可以自动清理。
  • Hadoop原生支持

1.3.2 HBase的缺点

  • HBase本身不支持很复杂的聚合运算(如,Join、GroupBy等)。如果业务中需要使用聚合运算,可以在HBase之上架设Phoenix组件(小规模OLTP)或者Spark组件(大规模聚合的OLTP)。
  • HBase本身没有二级索引功能,不支持二级索引查找。好在针对HBase实现的第三方二级索引方案非常丰富,比如目前比较普遍的使用Phoenix提供的二级索引功能。
  • HBase原生不支持全局跨行事务,只支持单行事务模型。同样,可以使用Phoenix提供的全局事务模型组件来弥补HBase的这个缺陷。

二、基础数据结构与算法

2.1 跳跃表

跳跃表是一种能高效实现插入、删除、查找的内存数据结构,这些操作期望复杂度都是O(logN)。

2.2 LSM树

LSM树(Log-Strucured Merge-Tree)本质上和B+树一样,是一种磁盘数据的索引结构。但和B+树不同的是,LSM树的索引对写入请求更友好。

LSM树的索引一般由两部分组成:

  • 内存部分:采用跳跃表来维护一个有序的KeyValue集合。
  • 磁盘部分:由多个内部KeyValue有序的文件组成。

2.2.1 KeyValue存储格式

一般来说,LSM中存储的是多个KeyValue组成的集合,每一个KeyValue一般都会用一个字节数组来表示。

HBase为例,其中,RowkeyFamilyQualifier(列族下的列)、TimestampType这5个字段组成KeyValue中的key部分(Key二进制内容,表示版本号64位long值,HBase中表现为timestamp,type三个必不可少)。Value部分直接存储这个KeyValue中Value的二进制内容。

其中type字段表示这个KeyValue操作的类型,这表明了LSM树内存储的不只是数据,而是每一次记录。

2.2.2 多路并归

类似归并排序算法一样,来合并多个有序文件成一个大文件。

2.2.3 LSM树的索引结构

分为内存部分和磁盘部分,本质是将写入操作全部转化成磁盘的顺序写入,极大地提高了写入操作的性能。但是对读取操作不友好,因为需要在读取的过程中,通过归并所有文件来读取所对应的KV,非常耗资源。因此HBase中设计了异步compaction来降低文件个数。

2.3布隆过滤器

场景

用来判断一个元素是否存在于一个集合中(当集合很大很大远远超出内存的时候)。

原理

布隆过滤器由一个长度为N的01数组组成。一开始全部初始化为0,然后对集合A中的每个元素进行K次Hash并对长度取模后获得K个下标索引,然后把这些下标对应的数组元素置为1。

现在需要判断w是否存在于集合A中,只要把w用上述方法一样K次Hash,每次的结果去数组里面查找,只要有一个值为0就说明这个元素不存在。如果全部为1,只能说明可能存在(假设x经过3次hash后,下标为1、2、6;y为3、4、5,这时候w的hash结果是1、2、3。虽然都是为1,但是属于2个元素)

N=K*|A|/ln2 ,使用这个公式可以保证最佳的误判率。N为数组长度,K为Hash次数,|A|为集合中元素个数。

HBase与布隆过滤器

在HBase 1.X中有3中类型:

  • NONE:不启用
  • ROW:按照rowkey来计算布隆过滤器的二进制串并存储。
  • ROWCOL:按照rowkey+family+qualifier这3个字段拼出byte[]来计算布隆过滤器值并存储。如果在查询的时候,get能指定这3个字段,那么布隆过滤器肯定能提高性能。但是如果缺少任何一个,则无法提升性能。

腾讯团队介绍了一种设计,他们游戏业务rowkey的设计:

rowkey=< userid>#< other-field>

即按照userid来做前缀扫描。(前缀固定,所以计算布隆过滤器的key值也就固定)

总结

布隆过滤器对Get和基于前缀扫描的Scan都非常友好。

三、HBase依赖服务

3.1 ZooKeeper

关于Zookeeper内容可以参考另一篇文章(ZooKeeper学习笔记)。现在来看一下HBase在ZK上创建的内容:

Code
[zk: localhost:2181(CONNECTED) 1] ls /hbase
[meta-region-server, rs, splitWAL, backup-masters, flush-table-proc, master-maintenance, online-snapshot, switch, master, running, draining, namespace, hbaseid, table]
  • meta-region-server:存储HBase集群元数据hbase:meta元数据表所在的RegionServer访问地址。

  • master/backup-masters:主/备管理节点,防止单点故障。

  • table:集群中所有表信息。

  • splitWAL:分布式故障恢复。

  • rs:集群中所有运行的RegionServer。

  • 等等。。。

3.2 HDFS

HDFS读写流程,具体详情参考文章(Hadoop学习笔记(二)HDFS)。

HDFS在HBase中扮演的角色

  • HBase本身不存储文件,它只规定文件格式以及文件内容,实际文存储件由HDFS实现。
  • HBase不提供机制保证存储数据高可靠性,数据的高可靠性由HDFS多副本保证。

3.3 HBase在HDFS中的文件布局

Code
drwxr-xr-x   - hadoop supergroup          0 2020-05-29 02:05 /hbase/.hbck
drwxr-xr-x - hadoop supergroup 0 2020-05-29 04:01 /hbase/.tmp
drwxr-xr-x - hadoop supergroup 0 2020-06-18 21:15 /hbase/MasterProcWALs
drwxr-xr-x - hadoop supergroup 0 2020-05-29 04:01 /hbase/WALs
drwxr-xr-x - hadoop supergroup 0 2020-06-03 17:31 /hbase/archive
drwxr-xr-x - hadoop supergroup 0 2020-05-29 02:05 /hbase/corrupt
drwxr-xr-x - hadoop supergroup 0 2020-05-29 02:05 /hbase/data
-rw-r--r-- 3 hadoop supergroup 42 2020-05-29 02:05 /hbase/hbase.id
-rw-r--r-- 3 hadoop supergroup 7 2020-05-29 02:05 /hbase/hbase.version
drwxr-xr-x - hadoop supergroup 0 2020-05-29 02:05 /hbase/mobdir
drwxr-xr-x - hadoop supergroup 0 2020-06-18 21:15 /hbase/oldWALs
drwx--x--x - hadoop supergroup 0 2020-05-29 02:05 /hbase/staging

四、HBase客户端

4.1 客户端与服务端交互流程

  1. 获取Configuration对象

    一般需要3个配置文件:hbase-size.xml、core-site.xml、hdfs-site.xml放到JVM可以加载到的地方

    java
    Configuration conf = HBaseConfiguration.create();
  2. 通过Configuration初始化Connection

    Connection是HBase客户端一切操作的基础,维持了客户端到整个HBase集群的连接。

    例如,集群里有2个Master、5个RegionServer。那么Connection会维持一个到Active Master的TCP和5个到RegionServer的TCP。

    通常,一个进程只需要为一个独立的集群建立一个Connection即可,并不需要连接池

    java
    Connection conn = ConnectionFactory.createConnection(conf);
  3. 通过Connection初始化Table

    Table是一个非常轻量级的对象,实现了用户访问表的所有API操作,如:Put、Get、Delete、Scan等。本质上,它使用的连接资源、配置信息、线程池、Meta缓存等,都来自Connection对象,因此由同一个Connection创建的多个Table,都可以共享上述这些资源。

    注意:branch-1以及之前版本,Table不是线程安全的类,不建议共享一个Table实例。但是,HBase2.0.0之后的版本,Table已经实现为线程安全类。可以通过同一个Connection为每个请求创建一个Table,但是要记得关闭Table对象。

    java
    TableName tableName = TableName.valueOf("ns1:t1");
    Table table = conn.getTable(tableName);
  4. 通过Table执行Put和Scan操作

    put(这里注意到参数都是byte[]):

    java
    // 通过bytes工具类创建字节数组(将字符串)
    byte[] rowId = Bytes.toBytes("row3");
    // 创建put对象
    Put put = new Put(rowId);
    byte[] f1 = Bytes.toBytes("f1");
    byte[] id = Bytes.toBytes("id");
    byte[] value = Bytes.toBytes(102);
    put.addColumn(f1, id, value);
    // 执行插入
    table.put(put);

    scan:

    java
    Configuration conf = HBaseConfiguration.create();
    try (Connection connection = ConnectionFactory.createConnection(conf)) {
    try (Table table = connection.getTable(TableName.valueOf("ns1:t1"))) {
    // scan设置
    Scan scan = new Scan();

    try (ResultScanner scanner = table.getScanner(scan)) {
    // 每次scanner.next()返回一个结果
    for (Result result : scanner) {
    // 这里可以看出每一个result中的cell都是相同的rowKey
    final String rowKey = Bytes.toString(result.getRow());
    System.out.println(rowKey);
    // 1个result结果包含N个cells
    for (Cell cell : result.listCells()) {
    System.out.print("[" +
    Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + ":" +
    Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()) + "]" + "\t"
    );
    }
    System.out.println("------------------------------------------");
    }
    }
    }
    }

4.2 定位Meta表

HBase一张表由多个Region构成,这些Region分布在不同RegionServer上。客户端在做任何操作的时候,要先确定数据在哪个Region上,然后根据Region的RegionServer信息,去对应的RegionServer上读取数据。因此,有一张特殊的表(hbase:meta),来存放整个集群所有Region的信息。

hbase:meta表始终只有一个Region,这是为了确保meta表多次操作的原子性,因为hbase只支持Region级别的事务。

4.2.1 Meta表构成

总体来说hbase:meta的一个rowkey就对应一个Region。

  • rowkey 主要由4部分组成

    • TableName(业务表明,带命名空间)
    • StarRow(业务表Region区间的起始rowkey)
    • Timestamp(Region创建的时间戳)
    • EncodedName(上面3个字段的MD5 Hex值)
    Code
    ns1:t1,,1592659846948.9c7ccaa30b993abc7d60a2d790289e51.
  • 每一行数据又主要分为4列

    • info:regioninfo:对应的Value存储4个信息

      Code
      column=info:regioninfo, timestamp=1592659847921, value={
      // EncodedName
      ENCODED => 9c7ccaa30b993abc7d60a2d790289e51,
      // RegionName
      NAME => 'ns1:t1,,1592659846948.9c7ccaa30b993abc7d60a2d790289e51.',
      // Region的StarRow和StopRow
      STARTKEY => '',
      ENDKEY => ''
      }
    • info:seqnumDuringOpen:存储Region打开时的sequenceId

      Code
      column=info:seqnumDuringOpen, timestamp=1592659847921, value=\x00\x00\x00\x00\x00\x00\x00\x02
    • info:server:存储Region落在哪个RegionServer上

      Code
      column=info:server, timestamp=1592659847921, value=hadoop001:16020
    • info:serverstartcode:所在RegionServer的启动时间戳

      Code
      column=info:serverstartcode, timestamp=1592659847921, value=1592658086718

4.2.2 热点Region问题

之前说到所有的操作要确定Region的位置,而这些位置信息都在hbase:meta表上。所有请求都访问这个表,那么会产生热点问题。解决方案就是将Region信息缓存在客户端。

avatar

以上就是客户端定位Region示意图,在客户端有一个MetaCache的缓存,客户端调用API时,会先去缓存中找业务rowkey所在的Region,情况可能如下:

  • Region信息为空:即缓存中没有这个Region信息,那么要去hbase:meta中查找(当然首次查找,需要到ZK集群获取hbase:meta所在的RegionServer)。然后返回一个二元组(regionStartRow, region)放到MetaCache中。
  • Region信息不为空,但是RPC请求对应的RegionServer上并没有该Region信息:说明缓存信息过期了,可能是Region迁移了,这时候就要重新去hbase:meta表找了。
  • Region信息不为空,且RPC请求对应的RegionServer上存在该Region信息:这就是大部分情况。

4.3 Scan 操作

常用的有:startRow、endRow、Filter、caching、batch、reversed、maxResultSize、version、timeRange等。

我们已经知道可以通过 table.getScanner(scan) 可以拿到一个scanner,然后不断执行scanner.next()就能拿到一个Result。

通常一个Result内部有多个cell,这些cell的rowkey一样,就相当于一行完整的数据。如果设置了setBatch那么就有可能是一行的一部分数据,即有batch个cell。

4.3.1 客户端读取Result流程

avatar

  1. 每次执行scanner.next()都会去cache队列中拿一个result(步骤4)

  2. 如果cache队列为空,那么发起RPC请求当前scanner后续的result数据(步骤1)

  3. 客户端收到result列表后(步骤2)

  4. 通过scanResultCache把这些result内的多个cell进行重组,最终组成用户需要的result放入cache中(步骤3)

    为什么要重组?因为RPC请求有限制,达到某个资源阈值就会立即返回当前获取到的cell,即可能无法一次拿那么多数据,那么这时候从服务器返回的result就有可能是一行中的部分数据,而我们需要的result是一行完整的数据。这时候就需要对result进行重组,来获得我们期望的数据。

以上步骤1+2+3,称为loadCache。

4.3.2 Scan的几个重要概念

  • caching:每次loadCache操作最多存放caching个result到cache队列中。控制caching也就可以控制每次loadCache向服务器请求的数据量,避免出现某一次next()操作耗时极长。
  • batch:用户拿到的result中最多含有一行数据中的batch个cell。
  • allowPartial:跳过重组过程,直接把收到的服务端result返回给用户。
  • maxResultSize:loadCache单次RPC操作最多拿到maxResultSize字节的结果集。

4.4 HBase 客户端避坑指南

4.4.1 RPC重试配置要点(几个超时参数)

  • hbase.rpc.timeout:单次RPC请求的超时时间(默认60 000ms)
  • hbase.client.retries.number:调用API时最多容许多生多少次RPC重试操作(默认35次)
  • hbase.client.pause:连续两次RPC重试之间的休眠时间(默认100ms。注意采用的是退避算法,也就是说重试次数越多,则休眠时间越长)
  • hbase.client.operation.timeout:单次API(即get/put/delete等操作)的超时时间(默认1 200 000ms)

4.4.2 Scan Filter 设置

  • PrefixFilter(前缀过滤器)

    会返回前缀为指定内容的rowkey,但是不高效,因为会对(-∞,rowkey)区间的内容全部扫描。解决方法是可以设置一个setStartRow(),RegionServer发现有这个属性会首先寻址定位到这个startRow,然后从这个位置开始扫描数据,这样就跳过了大量数据。

    不过,更简单的是直接设置setStartRow和setStopRow,即区间直接定位。这样效率最高

  • PageFilter(分页过滤器)

    用来限制返回的数据数量的。

    注意,HBase里Filter状态全部都是Region内部有效的。如果扫描的数据,一旦从一个Region切换到另一个Region那么之前那个Filter的内部状态就无效了(即计数器清0)。新的Region内部用的是一个新的Filter。

    当然,如果想实现分页功能,可以直接通过limit实现:setLimit(1000);

4.4.3 少量写和批量写

  • table.put(put):单行数据写入,在服务端先写WAL,然后邪写到MenStore,一旦写满flush到磁盘上。吞吐量受限于磁盘带宽、网络带宽、flush速度。但是保证不会丢数据,保证put的原子性。
  • table.put(List< Put> puts):批量写入,在客户端缓存put,凑足一批后打包成一次RPC发送到服务端,一次性写WAL,和MenStore。耗时会长一点,另外如果put分布在多个Region内,可能有一部分会失败(HBase不提供跨Region的多行事务),失败的会重试。
  • bulk load:直接将带写入数据生成HFile,直接加载到对应的Region下的CF内。在HBase服务器端没有任何RPC只有load HFile时会调用,是一种完全离线的快速写入方式。

五、RegionServer的核心模块

RegionServer的内部结构,在之前已经了解过了。现在我们来逐个解析。

5.1 HLog

HBase中系统故障恢复及主从复制都基于HLog实现。

5.1.1 HLog文件结构

avatar

每个RegionServer拥有一个或多个HLog(默认1个,1.1以后允许多个)。每个HLog是多个Region共享的。

5.1.2 HLog文件存储

在HDFS上的目录:

Code
/hbase/WALs/hadoop001,16020,1592727464395/hadoop001%2C16020%2C1592727464395.1592727473513

有3个部分:hadoop001是RegionServer域名,16020端口号,1592727464395目录生成时间戳。

可以使用命令查看内容:hbase hlog (但是我这个版本好像没有hlog,而是wal)

Code
[hadoop@hadoop002 /home/hadoop/app/hbase-2.2.4/logs]$hbase wal /hbase/WALs/hadoop002,16020,1592729053980/hadoop002%2C16020%2C1592729053980.1592729063117
Writer Classes: ProtobufLogWriter AsyncProtobufLogWriter
Cell Codec Class: org.apache.hadoop.hbase.regionserver.wal.WALCellCodec
Sequence=34, table=ns1:t1, region=9c7ccaa30b993abc7d60a2d790289e51, at write timestamp=Sun Jun 21 16:44:25 CST 2020
row=\x00, column=METAFAMILY:HBASE::REGION_EVENT::REGION_OPEN
cell total size sum: 320
edit heap size: 360
position: 408

5.1.3 HLog生命周期

  1. HLog构建:HBase任何写入(更新、删除)操作都会先把记录追加写入HLog文件中。

  2. HLog滚动:HBase后台启动一个线程,每隔一段时间(’hbase.regionserver.logroll.period‘决定,默认1小时)进行日志滚动。滚动的主要目的是方便过期日志以文件形式删除。

  3. HLog失效:写入数据一旦从MemStore落盘,那么对应的日志数据就失效了。只要日志文件中所有日志记录都已经落盘,那么该文件失效。一旦失效,被移动到oldWALs文件夹。(注意此时HLog还没有被删除)

  4. HLog删除:Mater后台会启动一个线程,每隔一段时间(’hbase.master.cleaner.interval‘,默认1分钟)检查一次oldWALs文件夹下的所有失效日志文件,确定可以删除后执行删除:(确认条件主要有2个)

    • 该HLog文件是否还在参与主从复制

    • 该HLog文件是否已经在oldWALs目录中存在10分钟(可以通过’hbase.master.logcleaner.ttl‘设置,默认10分钟)

      有个奇怪的地方,我的集群内,oldWALs并不会自动清理!?查阅资料后添加参数也没有效果。即,存在一大堆 /hbase/oldWALs/pv2-00000000000000000596.log 这样的文件,它们生成的时间间隔是1小时,可以看出是HLog滚动的1小时。

5.2 MemStore

HBase没有直接使用原始的跳跃表,而是用了JDK自带的ConcurrentSkipListMap,此外这是线程安全的。

MemStore由两个ConcurrentSkipListMap实现,写入操作会先写入A,当A中数据量达到阈值之后,会创建B来接受用户新的请求,而已经写满的A,会执行异步flush落盘形成HFile。

MemStore的GC问题

既然是缓存,那么就会涉及到GC问题。MemStore本身会占用大量内存,因此GC问题不可避免。而且MemStore的工作模式会引起严重的内存碎片问题。

一个RegionServer有多个Region构成,一个Region有多个MemStore。而这些所有的MemStore会共享内存,即数据混合在一起。因此如果有一个Region执行落盘操作,那么会产生大量内存碎片。

MSLAB内存管理方式

为了解决上述内存碎片可能导致的Full GC。借鉴了线程本地分配缓存的内存管理方式:

  1. 每个MemStore会实例化得到一个MemStoreLAB对象
  2. MemStoreLAB会申请一个2M大小的Chunk数组,同时维护一个Chunk偏移量,初始为0
  3. 当一个KeyValue值插入MemStore后,MemStoreLAB会把data数组复制到Chunk数组中,然后移动偏移量
  4. 当前Chunk满了之后,再申请一个新的Chunk

这样通过整块的方案即将一个Region的数据放到一起,可以很大程度减少内存碎片。但是这还存在一些小问题。

MemStore Chunk Pool

当Chunk写满之后,系统会重新申请一个新的Chunk,新建Chunk对象会再JVM的新生代申请内存,如果申请比较频繁会导致JVM的新生代Eden区满掉,触发YGC。

MemStore Chunk Pool 的思路:

  1. 系统创建一个Chunk Pool来管理所有未被引用的Chunk,这些Chunk就不会再被JVM当作垃圾回收。
  2. 如果一个Chunk没有再被引用,将其放入Chunk Pool
  3. 如果当前Chunk Pool已经达到容量最大值,就不会再接纳新的Chunk
  4. 如果需要申请新的Chunk来存储KeyValue,首先从Chunk Pool中获取,如果有就重复利用,没有就申请一个新的Chunk。

5.3 HFile

5.3.1 逻辑结构(V2)

avatar

HFile文件主要有4个部分:

  • Scanned Block 部分:表示顺序扫描的时候数据块将会被读取。

    这个部分包含3个数据块:

    1. Data Block:存储用户KeyValue数据
    2. Leaf Index Block:存储索引树的叶子节点数据(索引树有3层,这是最后一层)
    3. Bloom Block:存储布隆过滤器相关数据
  • Non-scanned Block 部分:再HFile顺序扫描时候数据不会被读取

    主要包括两部分:

    1. Meta Block
    2. Intermediate Level Data Index Blocks(索引树第二层)
  • Load-on-open 部分:这部分数据会在RegionServer打开HFile时直接加载到内存中

    包括:

    1. FlieInfo:固定长度数据块,主要记录文件的一些统计元信息,比较重要的是AVG_KEY_LEN和AVG_VALUE_LEN分别表示平均Key和Value的长度
    2. 布隆过滤器MetaBlock:记录布隆过滤器相关元数据信息
    3. RootDataIndex(索引树第一层,即索引树根节点信息)
    4. Meta Index Block
  • Trailer 部分:主要记录了HFile的版本信息、其他各个部分的偏移值和寻址信息。

5.3.2 物理结构

avatar

HFile文件由各种不同类型的Block(数据块)构成,虽然这些Block的类型不一样,但是数据结构相同

Block的大小可以创建列族的时候加上参数 blocksize=>’65535’ 指定,默认是64K。大号Block有利于大规模顺序扫描,而小号Block有利于随机查找。

HFileBlock 结构

avatar

HFileBlock主要包含两部分:

  • BlockHeader:存储相关元数据

    • BlockType:最核心字段,表示Block的类型。HBase定义了8种BlockType类型,核心如下:

      avatar

  • BlockData:存储具体数据

5.3.3 HFile基础Block

1) Trailer Block

主要记录了HFile的版本信息、各个部分的偏移量和寻址地址信息。

avatar

RegionServer在打开HFile时,会加载所有HFile的Trailer部分以及load-on-open部分到内存中。实际加载过程首先会解析Trailer Block,然后进一布加载load-on-open。具体步骤如下:

  1. 加载 HFile version 版本信息,包含majorVersion和minorVersion(主版本V1、V2还是V3,次版本信息),不同版本使用不同的文件解析器对HFile进行读取解析。
  2. HBase根据version信息计算Trailer Block大小,加载整个HFile Trailer Block到内存中。其中有很多信息,如上图。
  3. 然后根据其中两个重要字段:LoadOnOpenDataOffset和LoadOnOpenDataSize,前者表示偏移量,后者表示大小。HBase会在启动后讲load-on-open部分的数据全部加载到内存中。
2) Data Block

Bata Block 是 HBase中文件读取的最小单元。Data Block中存储用户的KeyValue(HBase存储的核心),HBase中所有数据都是以KeyValue结构存储在HBase中。

avatar

KeyValue结构

其中key length 和 value length 是两个固定长度的数值。value是实际写入的数据。

key由多个部分组成,从上图可以看出有各个部分。其中KeyType类型有4种:Put、Delete、DeleteColumn、DeleteFamily

从这里可以看出,每个KeyValue都包含rowkey、column family、column qulifier。因此在表结构设计时,这些字段尽可能设置短的原因。

3) 布隆过滤器相关Block

布隆过滤器之前说过了。可以想象HFile文件越大,里面KeyValue存储的越多,那么位数组就相应越大,太大就不适合加载到内存了。因此HFile V2 在设计上将位数组进行拆分成多个独立的位数组(根据Key拆分,一部分连续JKey使用一个位数组)。这样根据Key查询的时候,只要加载相应的位数组,减少内存开销。

在文件结构上多个位数组对应多个Bloom Block(数组+firstkey),为了方便定位对应的位数组,V2又设计了Bloom Index Block:

avatar

HFile种仅有一个Bloom Index Block数据块,位于load-on-open部分。如上图,从大方向上可以分为2部分:一个是HFile种布隆过滤器元数据基本信息(绿色),以及指向Bloom Block的索引项(红色)。

其中Block Key(图上显示FirstKey)是个非常关键的字段,表示该Index Entry指向的Bloom Block种第一个KV的Key值。BlockOffset表示对应Bloom Block在HFile中的偏移量。

get请求根据布隆过滤器查找流程:

  1. 首先根据带查找Key在Bloom Index Block所有的索引项中根据BlockKey进行二分查找。定位到对应的Bloom Index Entry。
  2. 再根据Bloom Index Entry中BlockOffset以及BlockOndiskSize加载该Key对应的位数组
  3. 对Key进行Hash映射,根据映射结果再位数组中查看是否都是1,如果不是,表示不存在Key,否则有可能存在。
4) 索引相关Block

根据索引层级不同,HFile中索引结构分为两种:single-level和mutil-level,前者表示单层索引,后者表示多级索引,一般为两级或者三级。

V2版本的Index Block有两类:

  1. Root Index Block(索引数根节点,属于load-on-open部分)

    avatar

    如图为多层索引的结构(单层索引,仅仅缺少mid的3个字段)。其中Index Entry为具体的索引对象。由3个字段组成:索引指向的DataBlock偏移量DataBlock在磁盘上的大小索引指向DataBlock中第一个的Key

    除此之外的MidKey相关信息,用于在对HFile进行split操作时,快速定位HFile的切分点位置。

  2. NonRoot Index Block

    • Intermediate Index Block(中间节点,属于Non-Scanned block 部分)
    • Leaf Index Block(叶子节点,直接指向实际DataBlock。属于scanned block部分)

    avatar

    一开始,由于HFile刚开始数量小,索引采用单层索引,只有Root Index一层索引,直接指向Data Block。当数据变大的时候,Root Index Block大小超过阈值后,索引分裂为多级结构,由一层索引变为两层,根节点指向叶子节点,叶子节点指向实际Data Block。如果数据更大,索引层变为三层。

NonRootIndexBlock 结构

avatar

不管是中间还是叶子节点,结构都是一样的。和根索引块不同的是多了一个内部索引(entry offset),表示index Entry相对于第一个Index Entry的偏移量。用于实现二分查找,加快速度。

5.3.4 HFile文件查看工具

Code
$hbase hfile
usage: hfile [-a] [-b] [-e] [-f <arg> | -r <arg>] [-h] [-i] [-k] [-m] [-p]
[-s] [-v] [-w <arg>]
-a,--checkfamily Enable family check
-b,--printblocks Print block index meta data
-e,--printkey Print keys
-f,--file <arg> File to scan. Pass full-path; e.g.
hdfs://a:9000/hbase/hbase:meta/12/34
-h,--printblockheaders Print block headers for each block.
-i,--checkMobIntegrity Print all cells whose mob files are missing
-k,--checkrow Enable row order check; looks for out-of-order
keys
-m,--printmeta Print meta data of file
-p,--printkv Print key/value pairs(这个可以直接打印KV内容)
-r,--region <arg> Region to scan. Pass region name; e.g.
'hbase:meta,,1'
-s,--stats Print statistics
-v,--verbose Verbose output; emits file and meta data
delimiters
-w,--seekToRow <arg> Seek to this row and print all the kvs for this
row only

示例:通过shell命令来查看内容,这里引用6.2.2生成的HFile来进行查看:

Code
$hbase hfile -m -f hdfs://mycluster/MRData/out/f1/5314cd3465004280a7ad779dbcbe6a2f

2020-06-22 23:57:39,198 INFO [main] metrics.MetricRegistries: Loaded MetricRegistries class org.apache.hadoop.hbase.metrics.impl.MetricRegistriesImpl
Block index size as per heapsize: 320
reader=hdfs://mycluster/MRData/out/f1/5314cd3465004280a7ad779dbcbe6a2f,
compression=none,
cacheConf=cacheDataOnRead=false,
cacheDataOnWrite=false,
cacheIndexesOnWrite=false,
cacheBloomsOnWrite=false,
cacheEvictOnClose=false,
cacheDataCompressed=false,
prefetchOnOpen=false,
firstKey=Optional[row100/f1:age/1592841003333/Put/seqid=0],
lastKey=Optional[row102/f1:name/1592841003333/Put/seqid=0],
avgKeyLen=23,
avgValueLen=3,
entries=9,
length=5319
Trailer:
fileinfoOffset=531,
loadOnOpenDataOffset=421,
dataIndexCount=1,
metaIndexCount=0,
totalUncomressedBytes=5226,
entryCount=9,
compressionCodec=NONE,
uncompressedDataIndexSize=36,
numDataIndexLevels=1,
firstDataBlockOffset=0,
lastDataBlockOffset=0,
comparatorClassName=org.apache.hadoop.hbase.CellComparatorImpl,
encryptionKey=NONE,
majorVersion=3,
minorVersion=3
Fileinfo:
BLOOM_FILTER_TYPE = ROW
BULKLOAD_SOURCE_TASK = attempt_1592722718442_0004_r_000000_0
BULKLOAD_TIMESTAMP = 1592841004789
DELETE_FAMILY_COUNT = 0
EARLIEST_PUT_TS = 1592841003333
EXCLUDE_FROM_MINOR_COMPACTION = false
KEY_VALUE_VERSION = 1
LAST_BLOOM_KEY = row102
MAJOR_COMPACTION_KEY = true
MAX_MEMSTORE_TS_KEY = 0
TIMERANGE = 1592841003333....1592841003333
hfile.AVG_KEY_LEN = 23
hfile.AVG_VALUE_LEN = 3
hfile.CREATE_TIME_TS = 0
hfile.LASTKEY = row102/f1:name/1592841003333/Put/vlen=0/mvcc=0
hfile.MAX_TAGS_LEN = 0
hfile.TAGS_COMPRESSED = false
Mid-key: Optional[row100/f1:age/1592841003333/Put/seqid=0]
Bloom filter:
BloomSize: 8
No of Keys in bloom: 3
Max Keys for bloom: 6
Percentage filled: 50%
Number of chunks: 1
Comparator: ByteArrayComparator
Delete Family Bloom filter:
Not present

5.4 BlockCache

BlockCache是RegionServer级别的,一个RegionServer只有一个BlockCache,在RegionServer启动的时候完成初始化工作。目前为止有3种实现方案:

  • LRUBlockCache(默认):将数据放入JVM Heap中
  • SlabCache:部分数据存放在堆外,可以缓解系统长事件GC
  • BucketCache:同上

六、HBase读写流程

6.1 HBase写入流程

avatar

6.1.1 客户端处理阶段

  1. 用户提交put请求后,HBase客户端会将写入的数据添加到本地缓冲区中,符合一定条件(阈值大小默认2M)就会通过AsyncProcess异步批量提交。

    注意!HBase默认设置autoflush为true,表示put请求会直接提交给服务器处理。将autoflush设置成false后,可以极大提升写入吞吐量,但是没有保护机制,如果客户端崩溃,会导致已经提交的数据丢失。

  2. 在提交前,HBase会在元数据表hbase:meta中根据rowkey找到它们归属的RegionServer。如果是批量请求,还会把这些rowkey按照HRegionLocation分组,不同分组的请求意味着发送到不同的RegionServer,因此每个分组对应一次RPC请求。

    avatar

    • 首先客户端根据rowkey在元数据缓存中查找,如果能找到该rowkey所在的RegionServer和Region,就可以直接发送写入请求(携带Region信息)到目标RegionServer
    • 如果客户端缓存没有找到对应信息,那么就要去ZK集群获取hbase:mate表所在的RegionServer,然后向该RegionServer发送查询请求,在hbase:meta表中查找rowkey所在的RegionServer以及Region信息,并将结果缓存在客户端。
    • 客户端根据获得的相关元数据信息将写入请求发送给目标RegionServer。

6.1.2 Region 写入阶段

服务端RegionServer接受到客户端的写入请求后,首先会反序列化为put对象,然后执行各种检查。检查后,执行一系列核心操作:

  1. Acquire locks:HBase中使用行锁保证对同一行数据的更新原子性。

  2. Update LATEST_TIMESTAMP timestamp:更新所有待写入(更新)KeyValue的时间戳为当前系统时间。

  3. Build WAL edit:在内存中构建WALEdit对象。(注意!这时候还没有写到HLog

  4. Append WALEdit To WAL:将步骤3中构造的在内存中的WALEdit记录顺序写入HLog中,此时不需要sync(同步到HDFS)操作。

    1. HLog持久化等级

    • SKIP_WAL:只写缓存,不写HLog。有数据丢失风险。
    • ASYNC_WAL:异步将数据写入HLog日志中。
    • SYNC_WAL:同步将数据写入日志文件中,注意数据只是写入文件系统,并没有真正落盘。还要看HDFS Flush策略。
    • FSYNC_WAL:同步将数据写入日志文件并强制落盘。
    • USER_DEFAULT:默认使用SYNC_WAL

    2. HLog写入模型

    HLog写入都要经过3个阶段:首先将数据写入本地缓存,然后将本地缓存写入文件系统,最后执行sync操作同步到磁盘

    当前版本使用了LMAX Disruptor框架实现了无锁有界队列操作。

  5. Write back to MemStore:写入WAL之后,再将数据写入MemStore。

    MemStore使用数据结构ConcurrentSkipListMap来实际存储KeyValue。

    根据之前的知识可以知道HBase使用了MSLAB机制,预先申请一个大的(2M)Chunk内存,避免严重的内存碎片问题而触发的Full GC。写入的KeyValue会进行一次封装,顺序拷贝到这个Chunk中。因此MemStore写入过程可以归为3步:

    1. 检查当前Chunk是否写满,如果写满,重新申请一个2M的Chunk
    2. 将当前KeyValue在内存中重新构建,在可用Chunk的指定offset处申请内存创建一个新的KeyValue
    3. 将新创建的KeyValue对象写入ConcurrentSkipListMap中。
  6. Release row locks:释放行锁。

  7. Sync wal:HLog真正sync到HDFS,再释放行锁之后执行sync操作是为了尽量减少持锁时间,提升写性能。如果sync失败,执行回滚操作将MemStore中已经写入的数据移除

  8. 结束写事务:此时该线程的更新操作才会对其他读请求可见,更新才实际生效。

6.1.3 MemStore Flush 阶段

当MemStore越来越大,达到一定的条件的时候,就会触发flush操作:

1) 触发条件
  • MemStore级别限制:Region中任意一个MemStore大小达到上限(hbase.hregion.memstore.flush.size,默认128MB)

  • Region级别限制:当Region中所有MemStore的大小总和达到上限(hbase.hregion.memstore.block.multiplier * hbase.hregion.memstore.flush.size)

  • RegionServer级别限制:当RegionServer中MemStore的大小总和超过低水位阈值(hbase.regionserver.global.memstore.size.lower.limit * hbase.regionserver.global.memstore.size)开始强制执行flush,先flush MemStore最大的Region,再flush次大的。

    如果此时写入吞吐量依然很高,导致总MemStore大小超过高水位阈值hbase.regionserver.global.memstore.size,RegionServer会阻塞更新并强制执行flush,直至总MemStore大小下降到低水位阈值

  • 当一个RegionServer中HLog数量达到上限(hbase.regionserver.maxlogs)

  • HBase定期刷新MemStore(默认周期1小时,确保MemStore不会长时间没有持久化)

  • 手动执行flush(可以通过shell命令:flush ‘tablename’ 或者 flush ‘regionname’分别对表和Region进行flush)

一般情况下flush不会对系统有太大的影响,只有RegionServe级别的会阻塞所有落在该RegionServer上的写入操作。直到MemStore数据量降低到配置阈值内。

2) 执行流程

为了减少flush过程对读写的影响(写数据的时候会写到MemStore,而再flush的时候数据不能改变),HBase采用了类似于两个阶段提交的方式,将flush过程分为3个阶段:

  1. prepare阶段:遍历当前Region中所有的MemStore,将MemStore中当前数据集CellSkipListSet(内部实现采用ConcurrentSkipListMap)做一个快照snapshot,然后再新建一个CellSkipListSet接受新的数据写入(即5.2中说的AB两个跳跃表)。

  2. flush阶段:遍历所有MemStore,将prepare阶段生成的snapshot持久化为临时文件(第3点介绍具体流程),这个过程涉及磁盘IO操作,因此相对耗时。

  3. commit阶段:遍历所有的MemStore,将flush阶段生成的临时文件移到指定的ColumnFamily目录下(应该就是HDFS目录 /hbase/data/命名空间/表名/region名/列族名 下),针对HFile生成对应的storeflie(实际上StoreFile就是对HFile做了轻量级包装,即StoreFile底层就是HFile)和Reader,把storefile添加到Store的storefiles列表中,最后再清空prepare阶段生成的snapshot。

3) HFile 文件构建流程

HBase执行flush操作之后将内存中的数据按照特定格式写成HFile文件,HFile文件中各个Block部分的构建流程如下:(文件结构在5.3中已经说明)

  1. 构建“Scanned Block”部分

    1. MemStore执行flush,首先新建一个Scanner,这个Scanner从存储KV数据的CellSkipListSet中依次从小到大读出每个cell(KeyValue)

    2. appendGrneralBloomFilter:在内存中使用布隆过滤器算法构建Bloom Block,下文也称为(Bloom Chunk)。

    3. appendDeleteFamilyBloomFilter:针对标记为”DeleteFamily”或者”DeleteFamilyVersion”的cell,在内存中使用布隆过滤器算法构建Bloom Block,基本流程和appendGrneralBloomFilter相同。

    4. (HFile.Write)writer.append:将cell写入Data Block中,这是HFile构建的核心

  2. 构建 Bloom Block

    布隆过滤器内存中维护了多个称为chunk(即Block,具体实现叫Chunk)的数据结构,一个chunk的组成:

    • 一块连续的内存区域,主要存储一个特定的数组。默认数组中所有位都是0。对于row类型的布隆过滤器,cell进来之后会对其rowkey执行hash映射,将其映射到位数组某一位,并修改值为1。
    • fistkey,第一个写入该chunk的cell的rowkey,用来构建Bloom Index Block(可以看5.3.3第3点图中左边红色部分)。

    cell写进来之后,首先判断当前chunk是否已经写满(chunk个数是否超过阈值)。如果超过阈值,会重新申请一个新的chunk,并将当前chunk加入ready chunks集合中。如果没有写满,则根据布隆过滤器算法使用多个hash函数分别对cell的rowkey进行映射,并修改值为1。

  3. 构建 Data Block

    一个cell在内存中生成对应的布隆过滤器信息(即,位数组置1)之后就会写入DataBlock,写入过程分两步:

    1. Encoding Key Value:使用特定编码对cell进行编码处理。

      编码思路:根据上一个KeyValue和当前KeyValue比较之后取delta(即,分别对rowkey、column family、 column进行比较然后取delta)。假如前后两个前后两个KeyValue的rowkey相同,当前rowkey就可以用一个特定的flag标记,不需要完整地存储整个rowkey。这样在某些场景下可以极大减少存储空间。

    2. 将编码后的KeyValue写入DataOutputStream

      随着cell不断写入,当前Data Block会因为大小超过阈值(默认64KB)而写满。写满后Data Block会将DataOutputStream的数据flush到文件,该Data Block此时完成落盘。

  4. 构建 Leaf Index Block

    Data Block落盘之后会立刻在内存中构建一个Leaf Index Entry对象,并加入到当前Leaf Index Block中。Leaf Index Entry对象有三个重要字段:

    • firstKey:落盘Data Block的第一个key。用来作为索引节点的实际内容。
    • blockOffset:落盘Data Block在HFile文件中的偏移量。用于定位Data Block。
    • blockDataSize:落盘Data Block的大小。用于定位之后加载数据。

    同样的,Leaf Index Block随着Entry的不断写入慢慢变大,一旦超过阈值(64KB),就要flush到文件执行落盘。需要注意的是,Leaf Index Block落盘是追加写入文件的,所以会形成HFile中Data Block和Leaf Index Block交叉出现的原因。

    和Data Block一样,在Leaf Index Block落盘之后,还需要往上构建Root Index Entry并写入Root Index Block,形成索引树的根节点。但是根节点没有追加写入到”Scanned block”部分,而是最后写入”load-on-open”。

  5. 构建 Bloom Block Index

    完成Data Block落盘还有一件非常重要的事情:检查是否有已经写满的Bloom Block。如果有,将该Bloom Block追加写入文件(在第二步中只是把chunk加到一个集合中),在内存中构建一个Bloom Index Entry并写入Bloom Index Block。

  6. 基本流程总结

    flush阶段生成的HFile和Compaction阶段生成的HFile流程完全相同,不同的是flush读取的MemStore中的KeyValue,而Compaction读取的是多个HFile中的KeyValue写成一个大的HFile,即KeyValue来源不同。

    首先从MemStore中获取KeyValue,然后根据KeyValue通过布隆过滤器算法生成Bloom Block加到ready chunks集合中,然后写入Data Block。一旦Data Block写满,就要将其落盘,同时构造一个索引Leaf Index Entry 加到 Leaf Index Block中,直到写满落盘(追加到Data Block之后)时,再构造一个索引Root Index Entry 加到 Root Index Block(后期写入”load-on-open”)。紧接着Data Block落盘,在raedy chunks中的chunk也要落盘(即,Bloom Block落盘),也是追加写入的。然后构建该Bloom Block的索引 Bloom Index Entry 加到 Bloom Index Block

    实际上,每写入一个KeyValue就会动态的去构建“Scanned Block”部分,等所有KeyValue都写入完成之后再再静态地构建“Non-scanned Block”部分、“Load on open”部分以及“Trailer”部分。

6.2 BulkLoad 功能

有这么一个场景:

用户数据位于HDFS中,业务需要定期将这部分数据海量导入HBase系统,以执行随机查询更新操作。这种场景如果调用API操作的话,会给RegionServer带来极大的压力。

  • RegionServer频繁flush,不断compact、split。影响稳定性。
  • RegionServer频繁GC。
  • 消耗大量CPU、带宽、内存、IO资源,与其他资源产生竞争。
  • 某些场景下,比如平均KV大小比较大,会耗尽RegionServer的处理线程,导致集群阻塞。

在4.4.3中提到过。bulk load:直接将带写入数据生成HFile,直接加载到对应的Region下的CF内。在HBase服务器端没有任何RPC只有load HFile时会调用,是一种完全离线的快速写入方式。

6.2.1 核心流程

  1. HFile生成阶段

    这个阶段会运行一个MapReduce任务,mapper需要自己来实现。将HDFS文件中的数据读出来组装成一个复合KV,其中Key是rorkey,Value可以是KeyValue对象、Put对象甚至是Delete对象;reducer由HBase负责,通过HFileOutputFormat2.configureIncrementalLoad()进行配置,这个方法负责:

    • 根据表信息配置一个全局有序的partitioner
    • 将partitioner文件上传到HDFS集群并写入DistributedCache
    • 设置 reduce task 的个数为目标Region的个数
    • 设置输出key/value类满足HFileOutputFormat所规定的格式要求
    • 根据类型设置reducer执行相应的排序(KeyValueSortReducer或者PutSortReducer)

    这个阶段会为每个Region生成一个对应的HFile文件

  2. HFile导入阶段

    在HFile准备就绪后,就可以使用工具completebulkload将HFile加载到在线HBase集群。completebulkload工具负责:

    • 依次检查第一步生成的所有HFile文件,将每个文件映射到对应的Region
    • 将HFile文件移动到对应Region所在的HDFS文件目录下
    • 告知Region对应的RegionServer,加载HFile文件对外提供服务

avatar

6.2.2 基础案例

  1. 生成HDFS上的数据源

    BulkLoad的数据源一定是在HDFS上的文件!如果是其他地方的数据要转换成HDFS上的文件。

    Code
    // 首先我们创建一个文件上传到HDFS中
    [BulkLoadData.txt]
    100 zhangsan 16
    101 wangwu 11
    102 lisi 30

    $hdfs dfs -mkdir /MRData
    $hdfs dfs -put BulkLoadData.txt /MRData/BulkLoadData.txt
    $hdfs dfs -ls /MRData/
    Found 1 items
    -rw-r--r-- 3 hadoop supergroup 42 2020-06-22 22:08 /MRData/BulkLoadData.txt
  2. 使用MR Job Driver程序将源文件转化为HFile

    java
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;

    import java.io.IOException;

    /**
    * BulkLoadDemo class
    *
    * @author BoWenWang
    * @date 2020/6/22 22:12
    */
    public class BulkLoadDemo {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    // 获取运行参数(其中至少一个输入路径以及一个输出路径)
    Configuration conf = HBaseConfiguration.create();
    String[] remainingArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
    if (remainingArgs.length < 2) {
    System.err.println("Usage: BulkLoadDemo <in> <out>");
    System.exit(2);
    }
    // HBase
    Connection connection = ConnectionFactory.createConnection(conf);
    Table table = connection.getTable(TableName.valueOf("ns1:t1"));
    // 创建Job任务
    Job job = Job.getInstance(conf, "BulkLoadDemo");
    // 设置Jar包运行的类
    job.setJarByClass(BulkLoadDemo.class);
    // (核心)设置Map
    job.setMapperClass(BulkLoadMapper.class);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    // (核心)通过HFileOutputFormat2.configureIncrementalLoad对Reduce进行配置
    HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("ns1:t1")));
    // 设置输入输出数据类型
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(HFileOutputFormat2.class);
    // 指定数据输入、输出路径
    for (int i = 0; i < remainingArgs.length - 1; i++) {
    FileInputFormat.addInputPath(job, new Path(remainingArgs[i]));
    }
    FileOutputFormat.setOutputPath(job, new Path(remainingArgs[remainingArgs.length - 1]));
    // 将job提交给集群运行
    System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    /**
    * Mapper程序
    */
    public static class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

    public static final byte[] ID_QUALIFIER = "id".getBytes();
    public static final byte[] NAME_QUALIFIER = "name".getBytes();
    public static final byte[] AGE_QUALIFIER = "age".getBytes();
    public static final byte[] COLUMN_FAMILY = "f1".getBytes();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    String[] fields = line.split(" ");

    byte[] rowKey = ("row" + fields[0]).getBytes();
    byte[] id = fields[0].getBytes();
    byte[] name = fields[1].getBytes();
    byte[] age = fields[2].getBytes();

    // 存储的数据
    final ImmutableBytesWritable putRowKey = new ImmutableBytesWritable(rowKey);
    Put put = new Put(rowKey).addColumn(COLUMN_FAMILY, ID_QUALIFIER, id)
    .addColumn(COLUMN_FAMILY, NAME_QUALIFIER, name)
    .addColumn(COLUMN_FAMILY, AGE_QUALIFIER, age);

    context.write(putRowKey, put);
    }
    }
    }

    然后打包jar到集群上运行:

    Code
    $hadoop jar HBase2-1.0-SNAPSHOT.jar BulkLoadDemo hdfs://mycluster/MRData hdfs://mycluster/MRData/out

    然后查看结果:

    Code
    $hdfs dfs -ls -R /MRData
    -rw-r--r-- 3 hadoop supergroup 42 2020-06-22 22:08 /MRData/BulkLoadData.txt
    drwxr-xr-x - hadoop supergroup 0 2020-06-22 23:50 /MRData/out
    -rw-r--r-- 3 hadoop supergroup 0 2020-06-22 23:50 /MRData/out/_SUCCESS
    drwxr-xr-x - hadoop supergroup 0 2020-06-22 23:50 /MRData/out/f1
    -rw-r--r-- 3 hadoop supergroup 5319 2020-06-22 23:50 /MRData/out/f1/5314cd3465004280a7ad779dbcbe6a2f

注意!可能会报错NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration,原因是没有导入hbase相关的包。因此要在hadoop-env.sh中添加如下内容:

Code
# 改成你自己对应的hbase目录
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/hadoop/apps/hbase/lib/*
  1. 将生成的HFile加载到HBase

    Code
    $hadoop jar hbase-mapreduce-2.2.4.jar completebulkload hdfs://mycluster/MRData/out ns1:t1

    这里要注意!可能是书上使用的HBase和我使用的不一样,导致命令失效,这个是采用HBase官网上找的命令。为此写了一篇博文:https://blog.csdn.net/weixin_42167895/article/details/106913686

    原书上命令如下:

    Code
    $ bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>

    另一种是

    Code
    HADOOP_CLASSPATH='${HBASE_HOME}/bin/hbase classpath' ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-server-VERSION.jar completebulkload <hdfs://storefileoutput> <tablename>
  2. scan 查看表内容确实导入成功

6.3 HBase读取流程

6.3.1 Client-Server读取交互逻辑

和6.1写入阶段客户端处理方式一样。Client会首先从ZooKeeper中获取元数据hbase:meta表所在的RegionServer,然后根据待读写rowkey发送请求到元数据所在RegionServer,获取数据所在的目标RegionServer和Region(并存储到本地缓存),最后将请求进行封装发送到目标RegionServer进行处理。(参考6.1.1和4.3.1 Scan流程)

6.3.2 Server端Scan框架体系

从宏观角度看,依次Scan可以能会扫描一张表的多个Region,对于这种扫描,客户端会根据hbase:meta元数据将扫描的起始区间进行划分,切分成多个相互独立的查询子区间,每个子区间对应一个Regin。

比如:当前表有3个Region,区间分别为[“a”, “c”),[“c”, “e”),[“e”, “g”),客户端设置的scan扫描区间为[“b”, “f”)。因为扫描跨越了多个Region,所以需要进行切分,切分后的区间[“b”, “c”),[“c”, “e”),[“e”, “f”)

HBase种每个Region都是一个独立的存储引擎,因此客户端可以将每个子区间请求分别发送给对应的Region进行处理。

RegionServer接收到客户端的get/scan请求后:

avatar

1) 构建Scanner Iterator体系
  • 一个RegionScanner由多个StoreScanner构成。一张表由多少个列族组成,就有多少个StoreScanner,每个StoreScanner负责对应Store的数据查找。(即图上步骤1)
  • 一个StoreScanner由MemStore和StoreFileScanner构成。每个Store的数据由内存中的MemStore和磁盘上的StoreFile文件组成。因此分别为两者创建Scanner。(步骤2)

注意!RegionScanner和StoreScanner并不负责实际查找操作,他们更多是承当组织调度任务,负责KeyValue最终查找的操作的StoreFileScanner和MemStoreScanner。

2) 过滤淘汰部分部不满足查询条件的Scanner

StoreCanner为每一个HFile构建一个对应的StoreFileScanner,需要注意的是,并不是每一个HFile都包含用户想要查找的KeyValue。相反,可以用过一些查询条件过滤掉很多肯定不存在待查找KeyValue的HFile(步骤3,其中StoreFile3检查不通过而被过滤)。主要过滤策略有:

  • Rowkey Range过滤

    因为StoreFile中所有KeyValue数据都是有序排列的,所以如果待检索范围[startkey, stopkey]与文件起始key范围[firstkey, lastkey]没有交集,如stoprow<firstkey或者startkey>lastkey,就可以过滤掉该StoreFile。

  • Time Range过滤

    StoreFile中元数据有一个关于该File的TimeRange属性,和上面一样,如果范围没有交集,过滤。

  • 布隆过滤器

    在5.3.3第三点和6.1.3第三点中已经知道了结构。系统根据待检索的rowkey获取对应的Bloom Block并加载到内存(通常情况下热点Bloom Block会常驻内存),再用hash函数对待检索rowkey进行hash,根据hash后的结果再布隆过滤器数据中进行寻址,即可确定待检索rowkey是否一定不存在于该HFile中。

3) 每个Scanner seek 到 startKey

在每个HFile文件(或MemStoer)中seek扫描起始点startKey。如果没有HFile中没有找到startKey,则seek下一个KeyValue地址。(步骤4)

在一个HFile文件中seek待查找的key,该过程可以分解为4步操作:

  1. 根据HFile索引树定位目标Block

    HRegionServer打开HFile时会将所有HFile的Trailer部分和Load-on-open部分加载到内存。其中Load-on-open部分有一个非常重要的Root Index Block,索引树的根节点。

    avatar

    BlockKey是整个Block的第一个rowkey。(索引树流程图在5.3.3 第4点,红虚线箭头表示一次查询索引过程。第一次根据rowkey二分查找,找到区间的第一个rowkey,这个root index block常驻内存所以很快,然后根据偏移量和大小,把对应中间节点索引Block载到内存。然后在该Block中通过二分查找定位范围,指向叶子节点索引Block加载到内存,同样的再进行定位,最后指向Data Block,加载到内存。而Data Block中存放的都是KeyValue,所以通过遍历的方式找到对应的KeyValue,完成seek。)

    这里我有一个理解(重点):就是在上面过滤完Scanenr后,每一个Scanner对应一个HFile。首先它会吧HFile的Trailer部分和Load-on-open部分加载到内存,以便获取到索引树。然后根据索引树去查找rowkey所在的Data Block,获取对应的BlockOffset和BlockDataSize。然后利用这些信息先去BlockCache里面看有没有DataBlock的缓存,如果有直接用(即下面的第二点)。如果没有就要去HDFS文件中找(即下面的第三点,因为数据都是放在HDFS上的),找到后加载到内存并缓存到BlockCache里。然后从Block中遍历KeyValue(即下面第四点)。因此第一点是主流程,剩下的3点只是条件分支,下面第3点有一个总体的流程,和这个描述差不多。

  2. BlockCache 中检索目标Block

    Block缓存到BlockCache之后会构建一个Map,Map的Key是BlockKey,Value是Block在内存中的地址。其中BlockKey由两部分构成——HFile名称以及Block在HFile中的偏移量。很显然,BlockKey是全局唯一的。根据BlockKey可以获取该Block在BlockCache中内存位置(即通过Map),然后直接加载出该Block对象。如果Block在BlockCache中没有找到待查Block,就需要去HDFS文件中查找(即,缓存中没有就要去磁盘上找)。

  3. HDFS文件中检索目标Block

    在上文中说到根据文件索引提供的BlockOffset以及BlockDataSize这两个元素可以在HDSF上读取到对应的DataBlock内容。这个阶段HBase下发命令给HDFS,HDFS执行真正的Data Block查找工作。

    HBase阶段

    1. 根据HFile中Block索引信息定位给定KV所在的DataBlock的基本信息:BlockOffset和BlockDataSize
    2. 根据HFile以及BlockOffset和BlockDataSize在BlockCache中查找是否存在,如果在直接返回,否则在HDFS中进行文件查找
    3. 调用FSDataInputStream(HFile文件打开后HBase就会为该文件建议一个输入流,后续该文件的所有读操作都会使用这个全局的FSDataInputStream),返回该文件偏移量为BlcokOffset,数据量为BlockDataSize大小的数据

    HDFS-NameNode阶段

    1. NameNode根据文件返回属于该文件的所有HDFS Block列表,客户端(这里的客户端是指HBase客户端,调用HDFS的API进行读取数据)可以将HDFSBlock列表进行缓存。

      这里要说明一下,在HDFS中,文件被切分为多个Block(这里可以理解为一个HFile被分割成很多个Block存在磁盘上)。这里的HDFSBlock列表,也就相当于是多个HDFSBlock组合成一个HFile。

    2. 遍历HDFSBlock列表,根据HDFSBlock元数据以及HBaseBlockOffset、HBaseBlockDataSize确定待查找HBase Block所属的HDFSBlock。

      因为HFile中的Block存在于HDFSBlock列表中的部分HDFSBlock上,并不是所有HDFSBlock都有需要的数据。

      NameNode上会存储数据文件与这些HDFSBlock的对应关系。

    3. 定位到HDFSBlock之后,再在NameNode中查找HDFSBlock都在哪些DateNode上,根据一定的规则选择最优的DateNode(本地DN最优)

      NameNode存放的只是元数据信息,真正的数据是放在DataNode上的。

    HDFS-DataNode阶段

    1. 同选定DN建立通信,传进HDFSBlockId、数据在该Block中偏移量以及数据量大小

    2. DN会在该HDFSBlock中seek到指定偏移量,并从磁盘读取指定大小的数据返回。

      注意:ND不会加载出整个HDFSBlock数据(128M)

    磁盘阶段

    HDFS读取磁盘数据是按照磁盘最小IO单位(4K)进行读取,直至读取出完整的64K数据。

  4. 从Block中读取待查找KeyValue

    HFile Block由KeyValue(由小到大依次存储)构成,但这些KeyValue并不是固定长度的,只能遍历扫描查找。

4) KeyValueScanner合并构建最小堆

将该Store中的所有StoreFileScanner和MemStoreScanner合并形成一个heap(最小堆)。最小堆管理Scanner可以保证取出来的KeyValue都是最小的,保证有序。(步骤5)

5) 执行next函数获取KeyValue并对其进行条件过滤

经过上述Scanner体系的构建后,此时从小到大的KeyValue已经可以由KeyValueScanner获得,但是还要进一步检查,以及是否满足用户设定的过滤条件:

  1. 检查该KeyValue的KeyType是否是Deleted/DeletedColumn/DeletedFamily等,如果是,则直接忽略该列所有其他版本,跳到下一列(列族)。
  2. 检查该KeyValue的Timastamp是否在用户设定的Timestamp Range范围。
  3. 检查该KeyValue是否满足用户设置的各种filer过滤器
  4. 检查该KeyValue是否满足用户查询中设定的版本数

过滤完之后,返回给用户数据。每次返回多少数据还要看用户设置的参数。可以看4.3.1的内容。

七、Compaction 实现

待续。。。

文章作者: IT小王
文章链接: https://wangbowen.cn/2020/06/14/%E3%80%8AHBase%E5%8E%9F%E7%90%86%E4%B8%8E%E5%AE%9E%E8%B7%B5%E3%80%8B%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 IT小王

评论