HBase学习笔记 一、简介
Feature特性
Linear and modular scalability. //线性模块化扩展方式。
Strictly consistent reads and writes. //严格一致性读写
Automatic and configurable sharding of tables //自动可配置表切割
Automatic failover support between RegionServers. //区域服务器之间自动容在
Convenient base classes for backing Hadoop MapReduce jobs with Apache HBase tables. //
Easy to use Java API for client access. //java API
Block cache and Bloom Filters for real-time queries //块缓存和布隆过滤器用于实时查询
Query predicate push down via server side Filters //通过服务器端过滤器实现查询预测
Thrift gateway and a REST-ful Web service that supports XML, Protobuf, and binary data encoding options //
Extensible jruby-based (JIRB) shell //
Support for exporting metrics via the Hadoop metrics subsystem to files or Ganglia; or via JMX //可视化
二、搭建HBase集群 集群规划:201用于master管理,202-204用于区域服务器。
jdk 、hadoop安装。
在201安装hbase-2.2.4-bin.tar.gz ,配置环境变量,分发到202-204
$tar -zxvf hbase-2.2.4-bin.tar.gz -C /soft/ $ln -s hbase-2.2.4 hbase $sudo vi /etc/profile # Hbase export HBASE_HOME=/soft/hbase export PATH=$PATH:$HBASE_HOME/bin $source /etc/profile [root@s201 /soft/hbase]#xsync.sh /etc/profile [wbw@s201 /soft/hbase/conf]$xcall.sh 'source /etc/profile'
验证是否安装成功
配置Hbase完全分布式
修改 hbase/conf/hbase-env.sh
# 修改JDK路径 export JAVA_HOME=/soft/jdk # 使用自己的ZK管理 export HBASE_MANAGES_ZK=false
修改 hbse-site.xml
<property > <name > hbase.cluster.distributed</name > <value > true</value > </property > <property > <name > hbase.rootdir</name > <value > hdfs://mycluster/hbase</value > </property > <property > <name > hbase.zookeeper.quorum</name > <value > s201:2181,s202:2181,s203:2181</value > </property > <property > <name > hbase.zookeeper.property.dataDir</name > <value > /home/wbw/zookeeper</value > </property >
配置 hbase/conf/regionservers
因为HBase数据将存储在HDFS上,故需要把Hadoop关于HDFS的相关配置文件(hdfs-site.xml和core-site.xml)拷贝到HBase的conf目录下,分别执行以下两条命令。【如果不做这个,那么mycluster将识别不到】
[wbw@s201 /soft/hadoop/etc/hadoop]$cp hdfs-site.xml /soft/hbase/conf/ [wbw@s201 /soft/hadoop/etc/hadoop]$cp core-site.xml /soft/hbase/conf/
然后分发到202-204
$xsync.sh hbase-2.2.4 $xsync.sh hbase
【或者】不推荐
1.在hbase-env.sh文件添加hadoop配置文件目录到HBASE_CLASSPATH环境变量并分发. [/soft/hbase/conf/hbase-env.sh] export HBASE_CLASSPATH=$HBASE_CLASSPATH:/soft/hadoop/etc/hadoop 2.在hbase/conf/目录下创建到hadoop的hdfs-site.xml符号连接。 $>ln -s /soft/hadoop/etc/hadoop/hdfs-site.xml /soft/hbase/conf/hdfs-site.xml
然后分发到202-204
启动Hbase(ZK集群启动状态)
start-hbase.sh 可以看到S201的HMaster,以及S202-204的HRegionServer [wbw@s201 /soft/hbase/conf]$xcall.sh jps ============= s201 : jps ============== 2144 DFSZKFailoverController 4337 Jps 2456 ResourceManager 1626 QuorumPeerMain 4110 HMaster ============= s202 : jps ============== 1409 QuorumPeerMain 2164 HRegionServer 2278 Jps 1511 DataNode 1577 JournalNode 1645 NodeManager ============= s203 : jps ============== 1377 QuorumPeerMain 2241 Jps 1619 NodeManager 1485 DataNode 1551 JournalNode 2127 HRegionServer ============= s204 : jps ============== 1552 NodeManager 2086 HRegionServer 2200 Jps 1418 DataNode 1484 JournalNode ============= s205 : jps ============== 1440 ResourceManager 1672 Jps 1374 DFSZKFailoverController 1311 NameNode
注意:可能HMaster起不来!
查看日志文件:
java.lang.IllegalStateException: The procedure WAL relies on the ability to hsync for proper operation during component failures, but the underlying filesystem does not support doing so. Please check the config value of 'hbase.procedure.store.wal.use.hsync' to set the desired level of robustness and ensure the config value of 'hbase.wal.dir' points to a FileSystem mount that can provide it.
对 hbase-site.xml 添加如下配置,然后分发,重启集群。
<property > <name > hbase.unsafe.stream.capability.enforce</name > <value > false</value > </property >
查看WEBUI
HBASE备份(高可用)【可选】
在s202上执行命令:
hbase-daemon.sh start master
启动一个备用Master,以保障HBase集群的高可用(HA)。
============= s202 : jps ============== 1409 QuorumPeerMain 3763 Jps 1511 DataNode 1577 JournalNode 3485 HRegionServer 1645 NodeManager 3645 HMaster //!!!
三、HBase Shell shell命令后面如果有参数都要加上单引号。
3.1 进入shell、帮助 $hbase shell //进入shell $hbase>help //帮助 COMMAND GROUPS: Group name: general Commands: processlist, status, table_help, version, whoami Group name: ddl Commands: alter, alter_async, alter_status, create, describe, disable, disable_all, drop, drop_all, enable, enable_all, exists, get_table, is_disabled, is_enabled, list, list_regions, locate_region, show_filters Group name: namespace Commands: alter_namespace, create_namespace, describe_namespace, drop_namespace, list_namespace, list_namespace_tables
可以看到有很多命令分组。
3.2 命名空间 命名空间有点类似MYSQL中的库的概念。
$hbase>help 'list_namespace' //查看特定的命令帮助 $hbase>list_namespace //列出名字空间(相当于MYSQL中的数据库) $hbase>list_namespace_tables 'default' //列出名字空间(数据库) $hbase>create_namespace 'ns1' //创建命名空间
3.3 创建表 $hbase>help 'create' $hbase>create 'ns1:t1','f1' //创建表,指定空间下.ns1为命名空间,t1为表名,f1为列族
3.4 删除表 要先禁用表才能删除。
disable 'ns1:t1' drop 'ns1:t1'
3.5 插入、更新 $hbase>put 'ns1:t1','row1','f1:id',100 //插入数据 $hbase>put 'ns1:t1','row1','f1:name','tom' //row1是行ID,后面跟着‘键’,‘值’
3.6 查看数据 $hbase>get 'ns1:t1','row1' //查询指定row $hbase>scan 'ns1:t1' //扫描表
四、JAVA API
导入依赖
<dependency > <groupId > org.apache.hbase</groupId > <artifactId > hbase-client</artifactId > <version > 1.4.1</version > </dependency >
添加配置文件
复制hbase集群的hbase-site.xml文件到模块的src/main/resources目录下。
编写代码
注意: 如果发现JAVA连接不上HBase,可能是本地开发环境的hosts没有将集群的host集写上去。
4.1 插入 @Test public void testPut () throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf("ns1:t1" ); Table table = conn.getTable(tableName); byte [] rowId = Bytes.toBytes("row3" ); Put put = new Put(rowId); byte [] f1 = Bytes.toBytes("f1" ); byte [] id = Bytes.toBytes("id" ); byte [] value = Bytes.toBytes(102 ); put.addColumn(f1, id, value); table.put(put); }
4.2 批量插入 @Test public void batchPut () throws IOException { DecimalFormat df = new DecimalFormat(); df.applyPattern("0000" ); Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf("ns1:t1" ); HTable table = (HTable) conn.getTable(tableName); table.setAutoFlush(false ); for (int i = 1 ; i < 10000 ; i++) { Put put = new Put(Bytes.toBytes("row" + df.format(i))); put.setWriteToWAL(false ); put.addColumn(Bytes.toBytes("f1" ), Bytes.toBytes("id" ), Bytes.toBytes(i)); put.addColumn(Bytes.toBytes("f1" ), Bytes.toBytes("name" ), Bytes.toBytes("tom" + i)); put.addColumn(Bytes.toBytes("f1" ), Bytes.toBytes("age" ), Bytes.toBytes(i % 100 )); table.put(put); if (i % 2000 == 0 ) { table.flushCommits(); } } table.flushCommits(); }
4.3 获取记录 @Test public void testGet () throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf("ns1:t1" ); Table table = conn.getTable(tableName); byte [] rowId = Bytes.toBytes("row3" ); Get get = new Get(rowId); Result r = table.get(get); byte [] idValue = r.getValue(Bytes.toBytes("f1" ), Bytes.toBytes("id" )); System.out.println(Bytes.toInt(idValue)); }
4.4 查看名字空间列表 @Test public void listNameSpaces () throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); NamespaceDescriptor[] ns = admin.listNamespaceDescriptors(); for (NamespaceDescriptor n : ns) { System.out.println(n.getName()); } }
4.5 创建名字空间列表 @Test public void createNameSpace () throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); NamespaceDescriptor nsd = NamespaceDescriptor.create("ns2" ).build(); admin.createNamespace(nsd); NamespaceDescriptor[] ns = admin.listNamespaceDescriptors(); for (NamespaceDescriptor n : ns) { System.out.println(n.getName()); } }
4.6 创建表 @Test public void createTable () throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); TableName tableName = TableName.valueOf("ns2:t2" ); HTableDescriptor tb1 = new HTableDescriptor(tableName); HColumnDescriptor col = new HColumnDescriptor("f1" ); tb1.addFamily(col); admin.createTable(tb1); }
4.7 查看指定名字空间下的表 @Test public void listTable () throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); HTableDescriptor[] htd = admin.listTableDescriptorsByNamespace("ns2" ); for (HTableDescriptor hTableDescriptor : htd) { System.out.println(hTableDescriptor.getTableName()); } }
4.8 禁用表 @Test public void disableTable () throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); admin.disableTable(TableName.valueOf("ns2:t2" )); admin.deleteTable(TableName.valueOf("ns2:t2" )); }
4.9 删除数据 @Test public void deleteData () throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf("ns1:t1" ); Table table = conn.getTable(tableName); Delete delete = new Delete(Bytes.toBytes("row0001" )); delete.addColumn(Bytes.toBytes("f1" ), Bytes.toBytes("id" )); delete.addColumn(Bytes.toBytes("f1" ), Bytes.toBytes("name" )); table.delete(delete); }
4.10 扫描表(指定列族和列) @Test public void scan () throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf("ns1:t1" ); Table table = conn.getTable(tableName); Scan scan = new Scan(); scan.withStartRow(Bytes.toBytes("row0002" )); scan.withStopRow(Bytes.toBytes("row0010" )); ResultScanner rs = table.getScanner(scan); Iterator<Result> iterator = rs.iterator(); while (iterator.hasNext()) { Result result = iterator.next(); byte [] name = result.getValue(Bytes.toBytes("f1" ), Bytes.toBytes("name" )); byte [] age = result.getValue(Bytes.toBytes("f1" ), Bytes.toBytes("age" )); byte [] id = result.getValue(Bytes.toBytes("f1" ), Bytes.toBytes("id" )); System.out.print(Bytes.toInt(id) + "," ); System.out.print(Bytes.toString(name) + "," ); System.out.println(Bytes.toInt(age)); } }
4.11 扫描表(指定列族) @Test public void scan2 () throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf("ns1:t1" ); Table table = conn.getTable(tableName); Scan scan = new Scan(); scan.withStartRow(Bytes.toBytes("row0001" )); scan.withStopRow(Bytes.toBytes("row0010" )); ResultScanner rs = table.getScanner(scan); Iterator<Result> iterator = rs.iterator(); while (iterator.hasNext()) { Result result = iterator.next(); NavigableMap<byte [], byte []> map = result.getFamilyMap(Bytes.toBytes("f1" )); for (Map.Entry<byte [], byte []> entry : map.entrySet()) { String col = Bytes.toString(entry.getKey()); String value = Bytes.toString(entry.getValue()); System.out.print(col + ":" + value + "," ); } System.out.println(); } }
4.12 扫描表(指定表) @Test public void scan3 () throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf("ns1:t1" ); Table table = conn.getTable(tableName); Scan scan = new Scan(); scan.withStartRow(Bytes.toBytes("row0001" )); scan.withStopRow(Bytes.toBytes("row0010" )); ResultScanner rs = table.getScanner(scan); Iterator<Result> iterator = rs.iterator(); while (iterator.hasNext()) { Result result = iterator.next(); NavigableMap<byte [], NavigableMap<byte [], NavigableMap<Long, byte []>>> map = result.getMap(); for (Map.Entry<byte [], NavigableMap<byte [], NavigableMap<Long, byte []>>> entry : map.entrySet()) { String f = Bytes.toString(entry.getKey()); Map<byte [], NavigableMap<Long, byte []>> colDataMap = entry.getValue(); for (Map.Entry<byte [], NavigableMap<Long, byte []>> ets : colDataMap.entrySet() ){ String c = Bytes.toString(ets.getKey()); Map<Long, byte []> tsValueMap = ets.getValue(); for (Map.Entry<Long,byte []> e : tsValueMap.entrySet()){ Long ts = e.getKey() ; String value = Bytes.toString(e.getValue()); System.out.print(f+":" +c+":" +ts+"=" +value + "," ); } } } System.out.println(); } }
五、HBase架构 5.1 HBase写入过程
WAL(write ahead log,写前日志)
(不完整,待续)
5.2 HBase基于HDFS 相同列族的数据存放在一个文件中。
表数据的存储目录结构构成
hdfs://s201:8020/hbase/data/${名字空间}/${表名}/${区域名称}/${列族名称}/${文件名}
WAL目录结构构成
hdfs://s201:8020/hbase/WALs/${区域服务器名称,主机名,端口号,时间戳}/
5.3 client端交互过程
hbase集群启动时,master负责分配区域到指定区域服务器。
联系zk,找出meta表所在rs(regionserver) 【/hbase/meta-region-server】
定位row key,找到对应region server
缓存信息在本地。
联系RegionServer
HRegionServer负责open HRegion对象,为每个列族创建Store对象,Store包含多个StoreFile实例,他们是对HFile的轻量级封装。每个Store还对应了一个MemStore,用于内存存储数据。
六、区域 6.1 区域切割 hbase切割文件10G进行切割。
<property > <name > hbase.hregion.max.filesize</name > <value > 10737418240</value > <source > hbase-default.xml</source > </property >
6.2 切割 shell $hbase>scan 'hbase:meta' //查看元数据表
$hbase>split 'ns1:t1' //切割表
//切割区域(将上面的第二区域再次进行切分) hbase(main):001:0> split 'ns1:t1,row5676,1586006932311.3e0703e224357f9111ee569021726486.','row8888' 0 row(s) in 0.5010 seconds
6.3 手动移动区域 hbase(main):002:0> help 'move' Move a region. Optionally specify target regionserver else we choose one at random. NOTE: You pass the encoded region name, not the region name so this command is a little different to the others. The encoded region name is the hash suffix on region names: e.g. if the region name were TestTable,0094429456,1289497600452.527db22f95c8a9e0116f0cc13c680396. then the encoded region name portion is 527db22f95c8a9e0116f0cc13c680396 A server name is its host, port plus startcode. For example: host187.example.com,60020,1289493121758 Examples: hbase> move 'ENCODED_REGIONNAME' hbase> move 'ENCODED_REGIONNAME', 'SERVER_NAME'
即:
ENCODED_REGIONNAME 在元数据信息表中可以找到
SERVER_NAME 在WEBUI中的/Home下可以看到
6.4 合并区域 hbase(main):003:0> help 'merge_region' Merge two regions. Passing 'true' as the optional third parameter will force a merge ('force' merges regardless else merge will fail unless passed adjacent regions. 'force' is for expert use only). NOTE: You must pass the encoded region name, not the full region name so this command is a little different from other region operations. The encoded region name is the hash suffix on region names: e.g. if the region name were TestTable,0094429456,1289497600452.527db22f95c8a9e0116f0cc13c680396. then the encoded region name portion is 527db22f95c8a9e0116f0cc13c680396 Examples: hbase> merge_region 'ENCODED_REGIONNAME', 'ENCODED_REGIONNAME' hbase> merge_region 'ENCODED_REGIONNAME', 'ENCODED_REGIONNAME', true
6.5 预切割 创建表时,预先对表进行切割。
create 'ns1:t2','f1',SPLITS => ['row3000','row6000']
七、版本控制 7.1 检索版本 创建表时,指定列族的版本数(即保存最近N次的改动)。
create 'ns1:t3',{NAME=>'f1',VERSIONS=>3}
然后我们添加4条记录。
hbase(main):002:0> put 'ns1:t3','row1','f1','tom1' 0 row(s) in 0.1670 seconds hbase(main):003:0> put 'ns1:t3','row1','f1','tom2' 0 row(s) in 0.0790 seconds hbase(main):004:0> put 'ns1:t3','row1','f1','tom3' 0 row(s) in 0.0280 seconds hbase(main):005:0> put 'ns1:t3','row1','f1','tom4' 0 row(s) in 0.0330 seconds
查询指定版本数记录(可以看到,虽然指定了4,但是设置的版本数为3,所以只显示3条)
hbase(main):007:0> get 'ns1:t3','row1',{COLUMN=>'f1',VERSIONS=>4} COLUMN CELL f1: timestamp=1586336405057, value=tom4 f1: timestamp=1586336401776, value=tom3 f1: timestamp=1586336398687, value=tom2
7.2 原生扫描 该操作为专家操作。RAW=>true
hbase(main):011:0> put 'ns1:t3','row1','f1','tom5' 0 row(s) in 0.0230 seconds hbase(main):012:0> scan 'ns1:t3',{COLUMN=>'f1',RAW=>true,VERSIONS=>10} ROW COLUMN+CELL row1 column=f1:, timestamp=1586336766575, value=tom5 row1 column=f1:, timestamp=1586336405057, value=tom4 row1 column=f1:, timestamp=1586336401776, value=tom3 row1 column=f1:, timestamp=1586336398687, value=tom2 hbase(main):020:0> get 'ns1:t3','row1',{COLUMN=>'f1',VERSIONS=>4} COLUMN CELL f1: timestamp=1586336766575, value=tom5 f1: timestamp=1586336405057, value=tom4 f1: timestamp=1586336401776, value=tom3
可以看到通过原生扫描可以看到所有的历史记录。接着我们删除其中的一条记录。再来看变化。可以发现get查看自动补上一条历史的版本。而原生扫描则会显示type=Delete
hbase(main):025:0> delete 'ns1:t3','row1','f1',1586336405057 hbase(main):026:0> get 'ns1:t3','row1',{COLUMN=>'f1',VERSIONS=>4} COLUMN CELL f1: timestamp=1586336766575, value=tom5 f1: timestamp=1586336401776, value=tom3 f1: timestamp=1586336398687, value=tom2 hbase(main):027:0> scan 'ns1:t3',{COLUMN=>'f1',RAW=>true,VERSIONS=>10} ROW COLUMN+CELL row1 column=f1:, timestamp=1586336766575, value=tom5 row1 column=f1:, timestamp=1586336405057, type=Delete row1 column=f1:, timestamp=1586336405057, value=tom4 row1 column=f1:, timestamp=1586336401776, value=tom3 row1 column=f1:, timestamp=1586336398687, value=tom2
八、其他参数 8.1 TTL 存活时间 定时清理数据。
超过该时间,原生扫描也扫不到数据。
TTL单位秒。TTL=>10
# 指定TTL创建表 hbase(main):031:0> create 'ns1:t4',{NAME=>'f1',TTL=>10,VERSIONS=>3} # 这里我们插入一条记录 hbase(main):032:0> put 'ns1:t4','row1','f1','tom1' # 前10秒查询 hbase(main):033:0> get 'ns1:t4','row1',{COLUMN=>'f1',VERSIONS=>4} COLUMN CELL f1: timestamp=1586337429892, value=tom1 # 10秒后查询 hbase(main):034:0> get 'ns1:t4','row1',{COLUMN=>'f1',VERSIONS=>4} COLUMN CELL # 通过原生原生扫描可以看到数据 hbase(main):035:0> scan 'ns1:t4',{COLUMN=>'f1',RAW=>true,VERSIONS=>10} ROW COLUMN+CELL row1 column=f1:, timestamp=1586337429892, value=tom1 # 刷新,写入磁盘! hbase(main):036:0> flush 'ns1:t4' # 再次原生扫描,发现数据已经消失了。 hbase(main):037:0> scan 'ns1:t4',{COLUMN=>'f1',RAW=>true,VERSIONS=>10} ROW COLUMN+CELL 0 row(s) in 0.0240 seconds
8.2 KEEP_DELETED_CELLS 保存删除记录 删除key之后,数据是否还保留。KEEP_DELETED_CELLS=>true
# 指定KEEP_DELETED_CELLS创建表 hbase(main):038:0> create 'ns1:t5',{NAME=>'f1',TTL=>10,VERSIONS=>3,KEEP_DELETED_CELLS=>true} # 添加一条记录 hbase(main):039:0> put 'ns1:t5','row1','f1','tom1' # 10秒后查看,发现数据消失了 hbase(main):040:0> get 'ns1:t5','row1',{COLUMN=>'f1',VERSIONS=>4} COLUMN CELL 0 row(s) in 0.0140 seconds # 通过原生扫描,发现数据存在 hbase(main):041:0> scan 'ns1:t5',{COLUMN=>'f1',RAW=>true,VERSIONS=>10} ROW COLUMN+CELL row1 column=f1:, timestamp=1586337719189, value=tom1 # 刷新操作 hbase(main):042:0> flush 'ns1:t4' 0 row(s) in 0.2030 seconds # 再次原生扫描,发现确实记录还存在 hbase(main):043:0> scan 'ns1:t5',{COLUMN=>'f1',RAW=>true,VERSIONS=>10} ROW COLUMN+CEL row1 column=f1:, timestamp=1586337719189, value=tom1
九、调优操作 对结果不影响,对性能有影响。
9.1 扫描器缓存(行级) Scan scan = new Scan(); scan.setCaching(5000 );
9.2 扫描器批处理(列级) Scan scan = new Scan(); scan.setBatch(5 );
9.3 Filter 过滤器 9.3.1 行过滤器 Scan scan = new Scan(); RowFilter rowFilter = new RowFilter( CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row0100" ))); scan.setFilter(rowFilter);
9.3.2 列族过滤器 Scan scan = new Scan(); FamilyFilter filter = new FamilyFilter( CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("f2" ))); scan.setFilter(filter);
9.3.3 列过滤器 Scan scan = new Scan(); QualifierFilter colfilter = new QualifierFilter( CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("id" ))); scan.setFilter(colfilter);
9.3.4 值过滤器 Scan scan = new Scan(); ValueFilter filter = new ValueFilter( CompareFilter.CompareOp.EQUAL, new SubstringComparator("to" )); scan.setFilter(filter);
9.3.5 依赖过滤器 Scan scan = new Scan(); DependentColumnFilter filter = new DependentColumnFilter( Bytes.toBytes("f2" ), Bytes.toBytes("addr" ), true , CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes("beijing" ))); scan.setFilter(filter);
9.3.6 单列值过滤器 单列值value过滤,对列上的value进行过滤,不符合整行删除。
Scan scan = new Scan(); SingleColumnValueFilter filter = new SingleColumnValueFilter( Bytes.toBytes("f2" ,Bytes.toBytes("name" ), CompareFilter.CompareOp.NOT_EQUAL), BinaryComparator(Bytes.toBytes("tom2.1" ))); scan.setFilter(filter);
9.3.7 前缀过滤器 前缀过滤,是rowkey过滤. where rowkey like ‘row22%’
Scan scan = new Scan(); PrefixFilter filter = new PrefixFilter(Bytes.toBytes("row222" )); scan.setFilter(filter);
9.3.8 分页过滤器 分页过滤,是rowkey过滤,在region上扫描时,对每次page设置的大小。
返回到到client,涉及到每个Region结果的合并。
Scan scan = new Scan(); PageFilter filter = new PageFilter(10 ); scan.setFilter(filter);
9.3.8 KeyOnly 过滤器 KeyOnly过滤器,只提取key,丢弃value.
Scan scan = new Scan(); KeyOnlyFilter filter = new KeyOnlyFilter(); scan.setFilter(filter);
9.3.9 FilterList 过滤器列表 复杂查询
@Test public void testComboFilter () throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tname = TableName.valueOf("ns1:t7" ); Scan scan = new Scan(); SingleColumnValueFilter ftl = new SingleColumnValueFilter( Bytes.toBytes("f2" ), Bytes.toBytes("age" ), CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("13" )) ); SingleColumnValueFilter ftr = new SingleColumnValueFilter( Bytes.toBytes("f2" ), Bytes.toBytes("name" ), CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^t" ) ); FilterList ft = new FilterList(FilterList.Operator.MUST_PASS_ALL); ft.addFilter(ftl); ft.addFilter(ftr); SingleColumnValueFilter fbl = new SingleColumnValueFilter( Bytes.toBytes("f2" ), Bytes.toBytes("age" ), CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes("13" )) ); SingleColumnValueFilter fbr = new SingleColumnValueFilter( Bytes.toBytes("f2" ), Bytes.toBytes("name" ), CompareFilter.CompareOp.EQUAL, new RegexStringComparator("t$" ) ); FilterList fb = new FilterList(FilterList.Operator.MUST_PASS_ALL); fb.addFilter(fbl); fb.addFilter(fbr); FilterList fall = new FilterList(FilterList.Operator.MUST_PASS_ONE); fall.addFilter(ft); fall.addFilter(fb); scan.setFilter(fall); Table t = conn.getTable(tname); ResultScanner rs = t.getScanner(scan); Iterator<Result> it = rs.iterator(); while (it.hasNext()) { Result r = it.next(); byte [] f1id = r.getValue(Bytes.toBytes("f1" ), Bytes.toBytes("id" )); byte [] f2id = r.getValue(Bytes.toBytes("f2" ), Bytes.toBytes("id" )); byte [] f1name = r.getValue(Bytes.toBytes("f1" ), Bytes.toBytes("name" )); byte [] f2name = r.getValue(Bytes.toBytes("f2" ), Bytes.toBytes("name" )); System.out.println(f1id + " : " + f2id + " : " + Bytes.toString(f1name) + " : " + Bytes.toString(f2name)); } }
9.3.10 BloomFilter 布隆过滤器 十、计数器 除了以上讨论的功能之外, HBase还有一个高级功能:计数器(counter)。许多收集统计信息的应用有点击流或在线广告意见,这些应用需要被收集到日志文件中用于后续的分析。用户可以使用计数器做实时统计,从而放弃延时较高的批量处理操作。
特点:迅速快捷!原子性!
Shell
$hbase>incr 'ns1:t8','row1','f1:click',1 $hbase>get_counter 'ns1:t8','row1','f1:click'
JAVA API
@Test public void testIncr () throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tname = TableName.valueOf("ns1:t8" ); Table t = conn.getTable(tname); Increment incr = new Increment(Bytes.toBytes("row1" )); incr.addColumn(Bytes.toBytes("f1" ),Bytes.toBytes("daily" ),1 ); incr.addColumn(Bytes.toBytes("f1" ),Bytes.toBytes("weekly" ),10 ); incr.addColumn(Bytes.toBytes("f1" ),Bytes.toBytes("monthly" ),100 ); t.increment(incr); }
十一、协处理器 批处理的,等价于存储过程或者触发器。
Observer
观察者,类似于触发器,基于事件。发生动作时,回调相应方法。
[hbase-site.xml] <property > <name > hbase.coprocessor.region.classes</name > <value > coprocessor.RegionObserverExample, coprocessor.AnotherCoprocessor</value > </property > <property > <name > hbase.coprocessor.master.classes</name > <value > coprocessor.MasterObserverExample</value > </property > <property > <name > hbase.coprocessor.wal.classes</name > <value > coprocessor.WALObserverExample, bar.foo.MyWALObserver</value > </property >
Endpoint
终端,类似于存储过程。
步骤 :
添加依赖
<dependencies > <dependency > <groupId > org.apache.hbase</groupId > <artifactId > hbase-client</artifactId > <version > 1.4.1</version > </dependency > <dependency > <groupId > org.apache.hbase</groupId > <artifactId > hbase-server</artifactId > <version > 1.4.1</version > </dependency > </dependencies >
编写自定义观察者
pre开头的就是操作前触发的,post就是操作后触发。
package cn.wangbowen.hbasedemo.coprocessor;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CoprocessorEnvironment;import org.apache.hadoop.hbase.client.Delete;import org.apache.hadoop.hbase.client.Durability;import org.apache.hadoop.hbase.client.Get;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;import org.apache.hadoop.hbase.coprocessor.ObserverContext;import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;import org.apache.hadoop.hbase.regionserver.wal.WALEdit;import org.apache.hadoop.hbase.util.Bytes;import java.io.FileWriter;import java.io.IOException;import java.util.List;public class MyRegionObserver extends BaseRegionObserver { private void outInfo (String str) { try { FileWriter fw = new FileWriter("/home/centos/coprocessor.txt" ,true ); fw.write(str + "\r\n" ); fw.close(); } catch (Exception e) { e.printStackTrace(); } } @Override public void start (CoprocessorEnvironment e) throws IOException { super .start(e); outInfo("MyRegionObserver.start()" ); } @Override public void preOpen (ObserverContext<RegionCoprocessorEnvironment> e) throws IOException { super .preOpen(e); outInfo("MyRegionObserver.preOpen()" ); } @Override public void postOpen (ObserverContext<RegionCoprocessorEnvironment> e) { super .postOpen(e); outInfo("MyRegionObserver.postOpen()" ); } @Override public void preGetOp (ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException { super .preGetOp(e, get, results); String rowkey = Bytes.toString(get.getRow()); outInfo("MyRegionObserver.preGetOp() : rowkey = " + rowkey); } @Override public void postGetOp (ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException { super .postGetOp(e, get, results); String rowkey = Bytes.toString(get.getRow()); outInfo("MyRegionObserver.postGetOp() : rowkey = " + rowkey); } @Override public void prePut (ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { super .prePut(e, put, edit, durability); String rowkey = Bytes.toString(put.getRow()); outInfo("MyRegionObserver.prePut() : rowkey = " + rowkey); } @Override public void postPut (ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { super .postPut(e, put, edit, durability); String rowkey = Bytes.toString(put.getRow()); outInfo("MyRegionObserver.postPut() : rowkey = " + rowkey); } @Override public void preDelete (ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException { super .preDelete(e, delete, edit, durability); String rowkey = Bytes.toString(delete.getRow()); outInfo("MyRegionObserver.preDelete() : rowkey = " + rowkey); } @Override public void postDelete (ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException { super .postDelete(e, delete, edit, durability); String rowkey = Bytes.toString(delete.getRow()); outInfo("MyRegionObserver.postDelete() : rowkey = " + rowkey); } }
注册协处理器并分发
<property > <name > hbase.coprocessor.region.classes</name > <value > cn.wangbowen.hbasedemo.coprocessor.MyRegionObserver</value > </property >
导出jar包,上传并分发hbase集群的hbase/lib目录下
重启集群生效
十二、RowKey设计原则 xx,yyy,zzz
利用字段,进行哈希处理,添加前缀,注意分散。不要以时间轴做。
根据实际业务需求,设计组成rowkey的字段的前后顺序。
十三、Hbase 集成 Hive Hive提供了更丰富的SQL查询功能,而Hbase则有者优秀的存储结构,因此可以利用Hive来查询Hbase。
集成步骤
将hbase有关jar包放到hive的lib中
hbase-server-2.2.4.jar hbase-client-2.2.4.jar hbase-it-2.2.4.jar hbase-hadoop2-compat-2.2.4.jar hbase-hadoop-compat-2.2.4.jar hbase-common-2.2.4.jar # 这个不在Hbase的lib下,下载地址:http://www.java2s.com/Code/Jar/h/Downloadhighscalelib10jar.htm high-scale-lib-1.1.1-sources.jar
复制命令(high-scakle包要自己下载上传)
cp hbase-server-2.2.4.jar hbase-client-2.2.4.jar hbase-it-2.2.4.jar hbase-hadoop2-compat-2.2.4.jar hbase-hadoop-compat-2.2.4.jar /soft/hive/lib/
hive的conf下在hive-site.xml文件中配置Zookeeper,hive通过这个参数去连接HBase集群。
<property > <name > hbase.zookeeper.quorum</name > <value > s201,s202,s203</value > </property >
在hbase中建立一张表(可以参考上面的创表、插入数据)
启动hive,建立联系(之前要先启动mysql,因为元数据在里面)
create external table t1( key string, name string, id int ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES("hbase.columns.mapping" = ":key,f1:name,f1:id") TBLPROPERTIES("hbase.table.name" = "ns1:t1");
创建之后HBase不存在这张表,这张表是在HIVE中的。 用普通的创建外部表的方式,创建出来的外部表是没有数据的,因为你的HIVE中的数据存放在HBase中而不是在HIVE中。
查看
hive (default)> select * from t1; row1 tom 100