avatar

目录
HBase学习笔记

HBase学习笔记

一、简介

  • hadoop数据库,分布式可伸缩大型数据存储。

  • 用户对随机、实时读写数据。

  • 十亿行 x 百万列。

  • 版本化、非关系型数据库。

  • 面向列数据库,面向列存储,table是按row排序。三级坐标定位(RowId,列族+列,版本号)。

avatar

Feature特性

Linear and modular scalability.                    //线性模块化扩展方式。
Strictly consistent reads and writes.            //严格一致性读写
Automatic and configurable sharding of tables    //自动可配置表切割
Automatic failover support between RegionServers.    //区域服务器之间自动容在
Convenient base classes for backing Hadoop MapReduce jobs with Apache HBase tables.        //
Easy to use Java API for client access.            //java API
Block cache and Bloom Filters for real-time queries    //块缓存和布隆过滤器用于实时查询 
Query predicate push down via server side Filters    //通过服务器端过滤器实现查询预测
Thrift gateway and a REST-ful Web service that supports XML, Protobuf, and binary data encoding options    //
Extensible jruby-based (JIRB) shell                    //
Support for exporting metrics via the Hadoop metrics subsystem to files or Ganglia; or via JMX            //可视化

二、搭建HBase集群

集群规划:201用于master管理,202-204用于区域服务器。

  1. jdk 、hadoop安装。

  2. 在201安装hbase-2.2.4-bin.tar.gz,配置环境变量,分发到202-204

    Code
    $tar -zxvf hbase-2.2.4-bin.tar.gz -C /soft/
    $ln -s hbase-2.2.4 hbase
    $sudo vi /etc/profile
    # Hbase
    export HBASE_HOME=/soft/hbase
    export PATH=$PATH:$HBASE_HOME/bin

    $source /etc/profile
    [root@s201 /soft/hbase]#xsync.sh /etc/profile
    [wbw@s201 /soft/hbase/conf]$xcall.sh 'source /etc/profile'
  3. 验证是否安装成功

    Code
    $hbase version
  4. 配置Hbase完全分布式

    • 修改 hbase/conf/hbase-env.sh

      Code
      # 修改JDK路径
      export JAVA_HOME=/soft/jdk
      # 使用自己的ZK管理
      export HBASE_MANAGES_ZK=false
    • 修改 hbse-site.xml

      xml
      <!-- 使用完全分布式 -->
      <property>
      <name>hbase.cluster.distributed</name>
      <value>true</value>
      </property>

      <!-- 指定hbase数据在hdfs上的存放路径 -->
      <property>
      <name>hbase.rootdir</name>
      <value>hdfs://mycluster/hbase</value>
      </property>
      <!-- 配置zk地址 -->
      <property>
      <name>hbase.zookeeper.quorum</name>
      <value>s201:2181,s202:2181,s203:2181</value>
      </property>
      <!-- zk的本地目录 -->
      <property>
      <name>hbase.zookeeper.property.dataDir</name>
      <value>/home/wbw/zookeeper</value>
      </property>
    • 配置 hbase/conf/regionservers

      Code
      s202
      s203
      s204
    • 因为HBase数据将存储在HDFS上,故需要把Hadoop关于HDFS的相关配置文件(hdfs-site.xml和core-site.xml)拷贝到HBase的conf目录下,分别执行以下两条命令。【如果不做这个,那么mycluster将识别不到】

      Code
      [wbw@s201 /soft/hadoop/etc/hadoop]$cp hdfs-site.xml /soft/hbase/conf/
      [wbw@s201 /soft/hadoop/etc/hadoop]$cp core-site.xml /soft/hbase/conf/

      然后分发到202-204

      Code
      $xsync.sh hbase-2.2.4
      $xsync.sh hbase
【或者】不推荐

Code
1.在hbase-env.sh文件添加hadoop配置文件目录到HBASE_CLASSPATH环境变量并分发.
[/soft/hbase/conf/hbase-env.sh]
export HBASE_CLASSPATH=$HBASE_CLASSPATH:/soft/hadoop/etc/hadoop

2.在hbase/conf/目录下创建到hadoop的hdfs-site.xml符号连接。
$>ln -s /soft/hadoop/etc/hadoop/hdfs-site.xml /soft/hbase/conf/hdfs-site.xml
然后分发到202-204
  1. 启动Hbase(ZK集群启动状态)

    Code
    start-hbase.sh
    可以看到S201的HMaster,以及S202-204的HRegionServer
    [wbw@s201 /soft/hbase/conf]$xcall.sh jps
    ============= s201 : jps ==============
    2144 DFSZKFailoverController
    4337 Jps
    2456 ResourceManager
    1626 QuorumPeerMain
    4110 HMaster
    ============= s202 : jps ==============
    1409 QuorumPeerMain
    2164 HRegionServer
    2278 Jps
    1511 DataNode
    1577 JournalNode
    1645 NodeManager
    ============= s203 : jps ==============
    1377 QuorumPeerMain
    2241 Jps
    1619 NodeManager
    1485 DataNode
    1551 JournalNode
    2127 HRegionServer
    ============= s204 : jps ==============
    1552 NodeManager
    2086 HRegionServer
    2200 Jps
    1418 DataNode
    1484 JournalNode
    ============= s205 : jps ==============
    1440 ResourceManager
    1672 Jps
    1374 DFSZKFailoverController
    1311 NameNode

    注意:可能HMaster起不来!

    查看日志文件:

    Code
    java.lang.IllegalStateException: The procedure WAL relies on the ability to hsync for proper operation during component failures, but the underlying filesystem does not support doing so. Please check the config value of 'hbase.procedure.store.wal.use.hsync' to set the desired level of robustness and ensure the config value of 'hbase.wal.dir' points to a FileSystem mount that can provide it.

    对 hbase-site.xml 添加如下配置,然后分发,重启集群。

    xml
    <property>
    <name>hbase.unsafe.stream.capability.enforce</name>
    <value>false</value>
    </property>
  2. 查看WEBUI

    Code
    http://s201:16010

    avatar

  3. HBASE备份(高可用)【可选】

    在s202上执行命令:

    Code
    hbase-daemon.sh start master

    启动一个备用Master,以保障HBase集群的高可用(HA)。

    Code
    ============= s202 : jps ==============
    1409 QuorumPeerMain
    3763 Jps
    1511 DataNode
    1577 JournalNode
    3485 HRegionServer
    1645 NodeManager
    3645 HMaster //!!!

    avatar

三、HBase Shell

shell命令后面如果有参数都要加上单引号。

3.1 进入shell、帮助

Code
$hbase shell		//进入shell
$hbase>help //帮助

COMMAND GROUPS:
Group name: general
Commands: processlist, status, table_help, version, whoami

Group name: ddl
Commands: alter, alter_async, alter_status, create, describe, disable, disable_all, drop, drop_all, enable, enable_all, exists, get_table, is_disabled, is_enabled, list, list_regions, locate_region, show_filters

Group name: namespace
Commands: alter_namespace, create_namespace, describe_namespace, drop_namespace, list_namespace, list_namespace_tables

可以看到有很多命令分组。

3.2 命名空间

命名空间有点类似MYSQL中的库的概念。

Code
$hbase>help	'list_namespace'			//查看特定的命令帮助
$hbase>list_namespace //列出名字空间(相当于MYSQL中的数据库)
$hbase>list_namespace_tables 'default' //列出名字空间(数据库)
$hbase>create_namespace 'ns1' //创建命名空间

3.3 创建表

Code
$hbase>help 'create'
$hbase>create 'ns1:t1','f1' //创建表,指定空间下.ns1为命名空间,t1为表名,f1为列族

3.4 删除表

要先禁用表才能删除。

Code
disable 'ns1:t1'
drop 'ns1:t1'

3.5 插入、更新

Code
$hbase>put 'ns1:t1','row1','f1:id',100		//插入数据
$hbase>put 'ns1:t1','row1','f1:name','tom' //row1是行ID,后面跟着‘键’,‘值’

3.6 查看数据

Code
$hbase>get 'ns1:t1','row1'					//查询指定row
$hbase>scan 'ns1:t1' //扫描表

四、JAVA API

  1. 导入依赖

    xml
    <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.4.1</version>
    </dependency>
  2. 添加配置文件

    复制hbase集群的hbase-site.xml文件到模块的src/main/resources目录下。

  3. 编写代码

    注意:如果发现JAVA连接不上HBase,可能是本地开发环境的hosts没有将集群的host集写上去。

4.1 插入

java
@Test
public void testPut() throws IOException {
// 创建conf对象
Configuration conf = HBaseConfiguration.create();
// 通过连接工厂创建连接对象
Connection conn = ConnectionFactory.createConnection(conf);
// 获得表名对象
TableName tableName = TableName.valueOf("ns1:t1");
// 获得table
Table table = conn.getTable(tableName);
// 通过bytes工具类创建字节数组(将字符串)
byte[] rowId = Bytes.toBytes("row3");
// 创建put对象
Put put = new Put(rowId);
byte[] f1 = Bytes.toBytes("f1");
byte[] id = Bytes.toBytes("id");
byte[] value = Bytes.toBytes(102);
put.addColumn(f1, id, value);
// 执行插入
table.put(put);
}

4.2 批量插入

java
@Test
public void batchPut() throws IOException {

// 数字格式化
DecimalFormat df = new DecimalFormat();
df.applyPattern("0000");

// 创建conf对象
Configuration conf = HBaseConfiguration.create();
// 通过连接工厂创建连接对象
Connection conn = ConnectionFactory.createConnection(conf);
// 获得表名对象
TableName tableName = TableName.valueOf("ns1:t1");
// 获得table
HTable table = (HTable) conn.getTable(tableName);

// 1.不要自动清理缓冲区
table.setAutoFlush(false);

for (int i = 1; i < 10000; i++) {
Put put = new Put(Bytes.toBytes("row" + df.format(i)));
// 2.关闭写前日志(有风险)
put.setWriteToWAL(false);
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("id"), Bytes.toBytes(i));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("tom" + i));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("age"), Bytes.toBytes(i % 100));
table.put(put);

// 定量刷新
if (i % 2000 == 0) {
table.flushCommits();
}
}
table.flushCommits();
}

4.3 获取记录

java
@Test
public void testGet() throws IOException {
// 创建conf对象
Configuration conf = HBaseConfiguration.create();
// 通过连接工厂创建连接对象
Connection conn = ConnectionFactory.createConnection(conf);
// 获得表名对象
TableName tableName = TableName.valueOf("ns1:t1");
// 获得table
Table table = conn.getTable(tableName);
// 通过bytes工具类创建字节数组(将字符串)
byte[] rowId = Bytes.toBytes("row3");
// 创建get对象
Get get = new Get(rowId);
Result r = table.get(get);
byte[] idValue = r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("id"));
System.out.println(Bytes.toInt(idValue));
}

4.4 查看名字空间列表

java
@Test
public void listNameSpaces() throws IOException {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);

Admin admin = conn.getAdmin();

NamespaceDescriptor[] ns = admin.listNamespaceDescriptors();
for (NamespaceDescriptor n : ns) {
System.out.println(n.getName());
}
}

4.5 创建名字空间列表

java
@Test
public void createNameSpace() throws IOException {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);

Admin admin = conn.getAdmin();
// 创建名字空间描述符
NamespaceDescriptor nsd = NamespaceDescriptor.create("ns2").build();
admin.createNamespace(nsd);

// 显示名字空间列表
NamespaceDescriptor[] ns = admin.listNamespaceDescriptors();
for (NamespaceDescriptor n : ns) {
System.out.println(n.getName());
}

}

4.6 创建表

java
@Test
public void createTable() throws IOException {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);

Admin admin = conn.getAdmin();
// 创建表名字对象
TableName tableName = TableName.valueOf("ns2:t2");
// 创建表描述符
HTableDescriptor tb1 = new HTableDescriptor(tableName);
// 创建列族描述符
HColumnDescriptor col = new HColumnDescriptor("f1");
tb1.addFamily(col);

admin.createTable(tb1);
}

4.7 查看指定名字空间下的表

java
@Test
public void listTable() throws IOException {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);

Admin admin = conn.getAdmin();
HTableDescriptor[] htd = admin.listTableDescriptorsByNamespace("ns2");
for (HTableDescriptor hTableDescriptor : htd) {
System.out.println(hTableDescriptor.getTableName());
}
}

4.8 禁用表

java
@Test
public void disableTable() throws IOException {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);

Admin admin = conn.getAdmin();
// 禁用表
admin.disableTable(TableName.valueOf("ns2:t2"));
// 删除表
admin.deleteTable(TableName.valueOf("ns2:t2"));
}

4.9 删除数据

java
@Test
public void deleteData() throws IOException {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);

TableName tableName = TableName.valueOf("ns1:t1");
Table table = conn.getTable(tableName);

// 选择行
Delete delete = new Delete(Bytes.toBytes("row0001"));
// 添加删除的列数据
delete.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("id"));
delete.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"));

table.delete(delete);
}

4.10 扫描表(指定列族和列)

java
@Test
public void scan() throws IOException {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);

TableName tableName = TableName.valueOf("ns1:t1");
Table table = conn.getTable(tableName);

Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes("row0002"));
scan.withStopRow(Bytes.toBytes("row0010"));

ResultScanner rs = table.getScanner(scan);
Iterator<Result> iterator = rs.iterator();
while (iterator.hasNext()) {
Result result = iterator.next();
byte[] name = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name"));
byte[] age = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("age"));
byte[] id = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("id"));
System.out.print(Bytes.toInt(id) + ",");
System.out.print(Bytes.toString(name) + ",");
System.out.println(Bytes.toInt(age));
}
}

4.11 扫描表(指定列族)

java
@Test
public void scan2() throws IOException {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);

TableName tableName = TableName.valueOf("ns1:t1");
Table table = conn.getTable(tableName);

Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes("row0001"));
scan.withStopRow(Bytes.toBytes("row0010"));

ResultScanner rs = table.getScanner(scan);
Iterator<Result> iterator = rs.iterator();
while (iterator.hasNext()) {
Result result = iterator.next();
NavigableMap<byte[], byte[]> map = result.getFamilyMap(Bytes.toBytes("f1"));
for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
String col = Bytes.toString(entry.getKey());
String value = Bytes.toString(entry.getValue());
System.out.print(col + ":" + value + ",");
}
System.out.println();
}
}

4.12 扫描表(指定表)

java
@Test
public void scan3() throws IOException {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);

TableName tableName = TableName.valueOf("ns1:t1");
Table table = conn.getTable(tableName);

Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes("row0001"));
scan.withStopRow(Bytes.toBytes("row0010"));

ResultScanner rs = table.getScanner(scan);
Iterator<Result> iterator = rs.iterator();
while (iterator.hasNext()) {
Result result = iterator.next();
// 得到一行的所有map,key=f1,value=Map<Col,Map<Timestamp,value>>
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = result.getMap();
for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entry : map.entrySet()) {
//得到列族
String f = Bytes.toString(entry.getKey());
Map<byte[], NavigableMap<Long, byte[]>> colDataMap = entry.getValue();
for(Map.Entry<byte[], NavigableMap<Long, byte[]>> ets : colDataMap.entrySet() ){
String c = Bytes.toString(ets.getKey());
Map<Long, byte[]> tsValueMap = ets.getValue();
for(Map.Entry<Long,byte[]> e : tsValueMap.entrySet()){
Long ts = e.getKey() ;
String value = Bytes.toString(e.getValue());
System.out.print(f+":"+c+":"+ts+"=" +value + ",");
}
}
}
System.out.println();
}
}

五、HBase架构

5.1 HBase写入过程

  • WAL(write ahead log,写前日志)
  • (不完整,待续)

5.2 HBase基于HDFS

相同列族的数据存放在一个文件中。

  • 表数据的存储目录结构构成

    Code
    hdfs://s201:8020/hbase/data/${名字空间}/${表名}/${区域名称}/${列族名称}/${文件名}
  • WAL目录结构构成

    Code
    hdfs://s201:8020/hbase/WALs/${区域服务器名称,主机名,端口号,时间戳}/

5.3 client端交互过程

  1. hbase集群启动时,master负责分配区域到指定区域服务器。

  2. 联系zk,找出meta表所在rs(regionserver) 【/hbase/meta-region-server】

  3. 定位row key,找到对应region server

  4. 缓存信息在本地。

  5. 联系RegionServer

  6. HRegionServer负责open HRegion对象,为每个列族创建Store对象,Store包含多个StoreFile实例,他们是对HFile的轻量级封装。每个Store还对应了一个MemStore,用于内存存储数据。

六、区域

6.1 区域切割

hbase切割文件10G进行切割。

xml
<property>
<name>hbase.hregion.max.filesize</name>
<value>10737418240</value>
<source>hbase-default.xml</source>
</property>

6.2 切割 shell

Code
$hbase>scan 'hbase:meta'	//查看元数据表

avatar

Code
$hbase>split 'ns1:t1'		//切割表

avatar

Code
//切割区域(将上面的第二区域再次进行切分)
hbase(main):001:0> split 'ns1:t1,row5676,1586006932311.3e0703e224357f9111ee569021726486.','row8888'
0 row(s) in 0.5010 seconds

avatar

avatar

6.3 手动移动区域

Code
hbase(main):002:0> help 'move'
Move a region. Optionally specify target regionserver else we choose one
at random. NOTE: You pass the encoded region name, not the region name so
this command is a little different to the others. The encoded region name
is the hash suffix on region names: e.g. if the region name were
TestTable,0094429456,1289497600452.527db22f95c8a9e0116f0cc13c680396. then
the encoded region name portion is 527db22f95c8a9e0116f0cc13c680396
A server name is its host, port plus startcode. For example:
host187.example.com,60020,1289493121758
Examples:

hbase> move 'ENCODED_REGIONNAME'
hbase> move 'ENCODED_REGIONNAME', 'SERVER_NAME'

即:

  • ENCODED_REGIONNAME 在元数据信息表中可以找到
  • SERVER_NAME 在WEBUI中的/Home下可以看到

6.4 合并区域

Code
hbase(main):003:0> help 'merge_region'
Merge two regions. Passing 'true' as the optional third parameter will force
a merge ('force' merges regardless else merge will fail unless passed
adjacent regions. 'force' is for expert use only).

NOTE: You must pass the encoded region name, not the full region name so
this command is a little different from other region operations. The encoded
region name is the hash suffix on region names: e.g. if the region name were
TestTable,0094429456,1289497600452.527db22f95c8a9e0116f0cc13c680396. then
the encoded region name portion is 527db22f95c8a9e0116f0cc13c680396

Examples:

hbase> merge_region 'ENCODED_REGIONNAME', 'ENCODED_REGIONNAME'
hbase> merge_region 'ENCODED_REGIONNAME', 'ENCODED_REGIONNAME', true

6.5 预切割

创建表时,预先对表进行切割。

Code
create 'ns1:t2','f1',SPLITS => ['row3000','row6000']

avatar

七、版本控制

7.1 检索版本

创建表时,指定列族的版本数(即保存最近N次的改动)。

Code
create 'ns1:t3',{NAME=>'f1',VERSIONS=>3}

然后我们添加4条记录。

Code
hbase(main):002:0> put 'ns1:t3','row1','f1','tom1'
0 row(s) in 0.1670 seconds

hbase(main):003:0> put 'ns1:t3','row1','f1','tom2'
0 row(s) in 0.0790 seconds

hbase(main):004:0> put 'ns1:t3','row1','f1','tom3'
0 row(s) in 0.0280 seconds

hbase(main):005:0> put 'ns1:t3','row1','f1','tom4'
0 row(s) in 0.0330 seconds

查询指定版本数记录(可以看到,虽然指定了4,但是设置的版本数为3,所以只显示3条)

Code
hbase(main):007:0> get 'ns1:t3','row1',{COLUMN=>'f1',VERSIONS=>4}
COLUMN CELL
f1: timestamp=1586336405057, value=tom4
f1: timestamp=1586336401776, value=tom3
f1: timestamp=1586336398687, value=tom2

7.2 原生扫描

该操作为专家操作。RAW=>true

Code
hbase(main):011:0> put 'ns1:t3','row1','f1','tom5'
0 row(s) in 0.0230 seconds

hbase(main):012:0> scan 'ns1:t3',{COLUMN=>'f1',RAW=>true,VERSIONS=>10}
ROW COLUMN+CELL
row1 column=f1:, timestamp=1586336766575, value=tom5
row1 column=f1:, timestamp=1586336405057, value=tom4
row1 column=f1:, timestamp=1586336401776, value=tom3
row1 column=f1:, timestamp=1586336398687, value=tom2

hbase(main):020:0> get 'ns1:t3','row1',{COLUMN=>'f1',VERSIONS=>4}
COLUMN CELL
f1: timestamp=1586336766575, value=tom5
f1: timestamp=1586336405057, value=tom4
f1: timestamp=1586336401776, value=tom3

可以看到通过原生扫描可以看到所有的历史记录。接着我们删除其中的一条记录。再来看变化。可以发现get查看自动补上一条历史的版本。而原生扫描则会显示type=Delete

Code
hbase(main):025:0> delete 'ns1:t3','row1','f1',1586336405057

hbase(main):026:0> get 'ns1:t3','row1',{COLUMN=>'f1',VERSIONS=>4}
COLUMN CELL
f1: timestamp=1586336766575, value=tom5
f1: timestamp=1586336401776, value=tom3
f1: timestamp=1586336398687, value=tom2

hbase(main):027:0> scan 'ns1:t3',{COLUMN=>'f1',RAW=>true,VERSIONS=>10}
ROW COLUMN+CELL
row1 column=f1:, timestamp=1586336766575, value=tom5
row1 column=f1:, timestamp=1586336405057, type=Delete
row1 column=f1:, timestamp=1586336405057, value=tom4
row1 column=f1:, timestamp=1586336401776, value=tom3
row1 column=f1:, timestamp=1586336398687, value=tom2

八、其他参数

8.1 TTL 存活时间

定时清理数据。

超过该时间,原生扫描也扫不到数据。

TTL单位秒。TTL=>10

Code
# 指定TTL创建表
hbase(main):031:0> create 'ns1:t4',{NAME=>'f1',TTL=>10,VERSIONS=>3}
# 这里我们插入一条记录
hbase(main):032:0> put 'ns1:t4','row1','f1','tom1'
# 前10秒查询
hbase(main):033:0> get 'ns1:t4','row1',{COLUMN=>'f1',VERSIONS=>4}
COLUMN CELL
f1: timestamp=1586337429892, value=tom1
# 10秒后查询
hbase(main):034:0> get 'ns1:t4','row1',{COLUMN=>'f1',VERSIONS=>4}
COLUMN CELL
# 通过原生原生扫描可以看到数据
hbase(main):035:0> scan 'ns1:t4',{COLUMN=>'f1',RAW=>true,VERSIONS=>10}
ROW COLUMN+CELL
row1 column=f1:, timestamp=1586337429892, value=tom1
# 刷新,写入磁盘!
hbase(main):036:0> flush 'ns1:t4'
# 再次原生扫描,发现数据已经消失了。
hbase(main):037:0> scan 'ns1:t4',{COLUMN=>'f1',RAW=>true,VERSIONS=>10}
ROW COLUMN+CELL
0 row(s) in 0.0240 seconds

8.2 KEEP_DELETED_CELLS 保存删除记录

删除key之后,数据是否还保留。KEEP_DELETED_CELLS=>true

Code
# 指定KEEP_DELETED_CELLS创建表
hbase(main):038:0> create 'ns1:t5',{NAME=>'f1',TTL=>10,VERSIONS=>3,KEEP_DELETED_CELLS=>true}
# 添加一条记录
hbase(main):039:0> put 'ns1:t5','row1','f1','tom1'
# 10秒后查看,发现数据消失了
hbase(main):040:0> get 'ns1:t5','row1',{COLUMN=>'f1',VERSIONS=>4}
COLUMN CELL
0 row(s) in 0.0140 seconds
# 通过原生扫描,发现数据存在
hbase(main):041:0> scan 'ns1:t5',{COLUMN=>'f1',RAW=>true,VERSIONS=>10}
ROW COLUMN+CELL
row1 column=f1:, timestamp=1586337719189, value=tom1
# 刷新操作
hbase(main):042:0> flush 'ns1:t4'
0 row(s) in 0.2030 seconds
# 再次原生扫描,发现确实记录还存在
hbase(main):043:0> scan 'ns1:t5',{COLUMN=>'f1',RAW=>true,VERSIONS=>10}
ROW COLUMN+CEL
row1 column=f1:, timestamp=1586337719189, value=tom1

九、调优操作

对结果不影响,对性能有影响。

9.1 扫描器缓存(行级)

java
Scan scan = new Scan();
scan.setCaching(5000);

9.2 扫描器批处理(列级)

java
Scan scan = new Scan();
scan.setBatch(5); // 每次next返回5列数据

9.3 Filter 过滤器

9.3.1 行过滤器

java
// 小于等于 'row0100'
Scan scan = new Scan();
RowFilter rowFilter = new RowFilter(
CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("row0100")));
scan.setFilter(rowFilter);

9.3.2 列族过滤器

java
Scan scan = new Scan();
FamilyFilter filter = new FamilyFilter(
CompareFilter.CompareOp.LESS,
new BinaryComparator(Bytes.toBytes("f2"))); // 二进制对比器
scan.setFilter(filter);

9.3.3 列过滤器

java
Scan scan = new Scan();
QualifierFilter colfilter = new QualifierFilter(
CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes("id")));
scan.setFilter(colfilter);

9.3.4 值过滤器

java
Scan scan = new Scan();
ValueFilter filter = new ValueFilter(
CompareFilter.CompareOp.EQUAL,
new SubstringComparator("to")); // 子串对比器
scan.setFilter(filter);

9.3.5 依赖过滤器

java
Scan scan = new Scan();
DependentColumnFilter filter = new DependentColumnFilter(
Bytes.toBytes("f2"), // 列族
Bytes.toBytes("addr"), // 列
true, // 列是否应在之后丢弃
CompareFilter.CompareOp.NOT_EQUAL, // 对比操作
new BinaryComparator(Bytes.toBytes("beijing"))); // 对比器
scan.setFilter(filter);

9.3.6 单列值过滤器

单列值value过滤,对列上的value进行过滤,不符合整行删除。

java
Scan scan = new Scan();
SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("f2",Bytes.toBytes("name"),
CompareFilter.CompareOp.NOT_EQUAL),
BinaryComparator(Bytes.toBytes("tom2.1")));
scan.setFilter(filter);
// SingleColumnValueExcludeFilter 单列值排除过滤器,去掉过滤使用的列,对列的值进行过滤

9.3.7 前缀过滤器

前缀过滤,是rowkey过滤. where rowkey like ‘row22%’

java
Scan scan = new Scan();
PrefixFilter filter = new PrefixFilter(Bytes.toBytes("row222"));
scan.setFilter(filter);

9.3.8 分页过滤器

分页过滤,是rowkey过滤,在region上扫描时,对每次page设置的大小。

返回到到client,涉及到每个Region结果的合并。

java
Scan scan = new Scan();
PageFilter filter = new PageFilter(10);
scan.setFilter(filter);

9.3.8 KeyOnly 过滤器

KeyOnly过滤器,只提取key,丢弃value.

java
Scan scan = new Scan();
KeyOnlyFilter filter = new KeyOnlyFilter();
scan.setFilter(filter);

9.3.9 FilterList 过滤器列表

复杂查询

java
@Test
public void testComboFilter() throws IOException {

Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
TableName tname = TableName.valueOf("ns1:t7");
Scan scan = new Scan();

//where ... f2:age <= 13
SingleColumnValueFilter ftl = new SingleColumnValueFilter(
Bytes.toBytes("f2"),
Bytes.toBytes("age"),
CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("13"))
);

//where ... f2:name like %t
SingleColumnValueFilter ftr = new SingleColumnValueFilter(
Bytes.toBytes("f2"),
Bytes.toBytes("name"),
CompareFilter.CompareOp.EQUAL,
new RegexStringComparator("^t")
);
//ft
FilterList ft = new FilterList(FilterList.Operator.MUST_PASS_ALL);
ft.addFilter(ftl);
ft.addFilter(ftr);

//where ... f2:age > 13
SingleColumnValueFilter fbl = new SingleColumnValueFilter(
Bytes.toBytes("f2"),
Bytes.toBytes("age"),
CompareFilter.CompareOp.GREATER,
new BinaryComparator(Bytes.toBytes("13"))
);

//where ... f2:name like %t
SingleColumnValueFilter fbr = new SingleColumnValueFilter(
Bytes.toBytes("f2"),
Bytes.toBytes("name"),
CompareFilter.CompareOp.EQUAL,
new RegexStringComparator("t$")
);
//ft
FilterList fb = new FilterList(FilterList.Operator.MUST_PASS_ALL);
fb.addFilter(fbl);
fb.addFilter(fbr);


FilterList fall = new FilterList(FilterList.Operator.MUST_PASS_ONE);
fall.addFilter(ft);
fall.addFilter(fb);

scan.setFilter(fall);
Table t = conn.getTable(tname);
ResultScanner rs = t.getScanner(scan);
Iterator<Result> it = rs.iterator();
while (it.hasNext()) {
Result r = it.next();
byte[] f1id = r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("id"));
byte[] f2id = r.getValue(Bytes.toBytes("f2"), Bytes.toBytes("id"));
byte[] f1name = r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name"));
byte[] f2name = r.getValue(Bytes.toBytes("f2"), Bytes.toBytes("name"));
System.out.println(f1id + " : " + f2id + " : " + Bytes.toString(f1name) + " : " + Bytes.toString(f2name));
}
}

9.3.10 BloomFilter 布隆过滤器

十、计数器

除了以上讨论的功能之外, HBase还有一个高级功能:计数器(counter)。许多收集统计信息的应用有点击流或在线广告意见,这些应用需要被收集到日志文件中用于后续的分析。用户可以使用计数器做实时统计,从而放弃延时较高的批量处理操作。

特点:迅速快捷!原子性!

Shell

Code
$hbase>incr 'ns1:t8','row1','f1:click',1
$hbase>get_counter 'ns1:t8','row1','f1:click'

JAVA API

java
@Test
public void testIncr() throws IOException {

Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
TableName tname = TableName.valueOf("ns1:t8");
Table t = conn.getTable(tname);
Increment incr = new Increment(Bytes.toBytes("row1"));
incr.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("daily"),1);
incr.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("weekly"),10);
incr.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("monthly"),100);
t.increment(incr);
}

十一、协处理器

批处理的,等价于存储过程或者触发器。

avatar

Observer

观察者,类似于触发器,基于事件。发生动作时,回调相应方法。

  • RegionObserver:RegionServer区域观察者。system –> user[加载顺序]

  • MasterObserver:Master节点。

  • WAlObserver

xml
[hbase-site.xml]
<property>
<name>hbase.coprocessor.region.classes</name>
<value>coprocessor.RegionObserverExample, coprocessor.AnotherCoprocessor</value>
</property>
<property>
<name>hbase.coprocessor.master.classes</name>
<value>coprocessor.MasterObserverExample</value>
</property>
<property>
<name>hbase.coprocessor.wal.classes</name>
<value>coprocessor.WALObserverExample, bar.foo.MyWALObserver</value>
</property>

Endpoint

终端,类似于存储过程。

步骤

  1. 添加依赖

    xml
    <dependencies>
    <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.4.1</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.4.1</version>
    </dependency>
    </dependencies>
  2. 编写自定义观察者

    pre开头的就是操作前触发的,post就是操作后触发。

    java
    package cn.wangbowen.hbasedemo.coprocessor;

    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CoprocessorEnvironment;
    import org.apache.hadoop.hbase.client.Delete;
    import org.apache.hadoop.hbase.client.Durability;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
    import org.apache.hadoop.hbase.coprocessor.ObserverContext;
    import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
    import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
    import org.apache.hadoop.hbase.util.Bytes;

    import java.io.FileWriter;
    import java.io.IOException;
    import java.util.List;
    public class MyRegionObserver extends BaseRegionObserver {
    private void outInfo(String str){
    try {
    FileWriter fw = new FileWriter("/home/centos/coprocessor.txt",true);
    fw.write(str + "\r\n");
    fw.close();
    } catch (Exception e) {
    e.printStackTrace();
    }

    }
    // 每个区域都经过
    @Override
    public void start(CoprocessorEnvironment e) throws IOException {
    super.start(e);
    outInfo("MyRegionObserver.start()");
    }

    @Override
    public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
    super.preOpen(e);
    outInfo("MyRegionObserver.preOpen()");
    }

    @Override
    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
    super.postOpen(e);
    outInfo("MyRegionObserver.postOpen()");
    }

    @Override
    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException {
    super.preGetOp(e, get, results);
    String rowkey = Bytes.toString(get.getRow());
    outInfo("MyRegionObserver.preGetOp() : rowkey = " + rowkey);
    }

    @Override
    public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException {
    super.postGetOp(e, get, results);
    String rowkey = Bytes.toString(get.getRow());
    outInfo("MyRegionObserver.postGetOp() : rowkey = " + rowkey);
    }

    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
    super.prePut(e, put, edit, durability);
    String rowkey = Bytes.toString(put.getRow());
    outInfo("MyRegionObserver.prePut() : rowkey = " + rowkey);
    }

    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
    super.postPut(e, put, edit, durability);
    String rowkey = Bytes.toString(put.getRow());
    outInfo("MyRegionObserver.postPut() : rowkey = " + rowkey);
    }

    @Override
    public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
    super.preDelete(e, delete, edit, durability);
    String rowkey = Bytes.toString(delete.getRow());
    outInfo("MyRegionObserver.preDelete() : rowkey = " + rowkey);
    }

    @Override
    public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
    super.postDelete(e, delete, edit, durability);
    String rowkey = Bytes.toString(delete.getRow());
    outInfo("MyRegionObserver.postDelete() : rowkey = " + rowkey);
    }

    }
  3. 注册协处理器并分发

    xml
    <property>
    <name>hbase.coprocessor.region.classes</name>
    <value>cn.wangbowen.hbasedemo.coprocessor.MyRegionObserver</value>
    </property>
  4. 导出jar包,上传并分发hbase集群的hbase/lib目录下

  5. 重启集群生效

十二、RowKey设计原则

xx,yyy,zzz

利用字段,进行哈希处理,添加前缀,注意分散。不要以时间轴做。

根据实际业务需求,设计组成rowkey的字段的前后顺序。

十三、Hbase 集成 Hive

Hive提供了更丰富的SQL查询功能,而Hbase则有者优秀的存储结构,因此可以利用Hive来查询Hbase。

集成步骤

  1. 将hbase有关jar包放到hive的lib中

    Code
    hbase-server-2.2.4.jar
    hbase-client-2.2.4.jar
    hbase-it-2.2.4.jar
    hbase-hadoop2-compat-2.2.4.jar
    hbase-hadoop-compat-2.2.4.jar
    hbase-common-2.2.4.jar

    # 这个不在Hbase的lib下,下载地址:http://www.java2s.com/Code/Jar/h/Downloadhighscalelib10jar.htm

    high-scale-lib-1.1.1-sources.jar

    复制命令(high-scakle包要自己下载上传)

    Code
    cp hbase-server-2.2.4.jar hbase-client-2.2.4.jar hbase-it-2.2.4.jar hbase-hadoop2-compat-2.2.4.jar hbase-hadoop-compat-2.2.4.jar /soft/hive/lib/
  2. hive的conf下在hive-site.xml文件中配置Zookeeper,hive通过这个参数去连接HBase集群。

    xml
    <property>
    <name>hbase.zookeeper.quorum</name>
    <value>s201,s202,s203</value>
    </property>
  3. 在hbase中建立一张表(可以参考上面的创表、插入数据)

  4. 启动hive,建立联系(之前要先启动mysql,因为元数据在里面)

    Code
    create external table t1(
    key string,
    name string,
    id int
    )
    STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    WITH SERDEPROPERTIES("hbase.columns.mapping" = ":key,f1:name,f1:id")
    TBLPROPERTIES("hbase.table.name" = "ns1:t1");

    创建之后HBase不存在这张表,这张表是在HIVE中的。
    用普通的创建外部表的方式,创建出来的外部表是没有数据的,因为你的HIVE中的数据存放在HBase中而不是在HIVE中。

  5. 查看

    Code
    hive (default)> select * from t1;
    row1 tom 100
文章作者: IT小王
文章链接: https://wangbowen.cn/2020/03/29/HBase%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 IT小王

评论