avatar

目录
Storm学习笔记

Storm学习笔记

一、简介

1.1 特点

Apache Storm是一个免费的开源分布式实时计算系统。

Apache Storm使得可靠处理无限数据流变得容易,实时处理就像Hadoop批处理一样。

Apache Storm很简单,可以与任何编程语言一起使用,并且使用起来很有趣!

Apache Storm有许多用例:实时分析,在线机器学习,连续计算,分布式RPC,ETL等。

Apache Storm速度很快:基准测试表明它每秒可处理每个节点超过一百万个元组。它具有可扩展性,容错性,可确保您的数据将得到处理,并且易于设置和操作。

Apache Storm与您已经使用的排队和数据库技术集成。

Apache Storm拓扑会消耗数据流,并以任意复杂的方式处理这些流,但是可以根据需要在计算的每个阶段之间重新分配流。在教程中阅读更多内容。

1.2 Storm对比Hadoop

Storm Hadoop
实时流处理 批处理
无状态 有状态
使用zk协同的主从架构 无zk的主从结构
每秒处理数万消息 HDFS MR数分钟、数小时
不会主动停止 终有完成的时候

1.3 核心概念

avatar

名称 介绍
Tuple 主要的数据结构,有序元素的列表。
Stream Tuple的序列。
Spouts 数据流源头。可以读取kafka队列消息。可以自定义。
Bolts 转接头、逻辑处理单元。
spout的数据传递个bolt,bolt计算,完成后产生新的数据。
IBolt是核心接口。
Topology Spout + bolt连接在一起形成一个Topology,形成有向图,定点就是计算,边是数据流。
task Bolt中每个Spout或者bolt都是一个task。

1.4 架构

avatar

名称 介绍
Nimbus master节点。
核心组件,运行Topology。
分析Topology并收集运行task。分发task给supervisor.
监控Topology。
无状态,依靠zk监控Topology的运行状况。
Supervisor 每个supervisor有n个worker进程,负责代理task给worker。
worker在孵化Executor线程最终运行task。
storm使用内部消息系统在nimbus和supervisor之间进行通信。
接受nimbus指令,管理worker进程完成task派发。
worker 执行特定的task,worker本身不执行任务,而是孵化executors,让executors执行task。
Executor 本质上由worker进程孵化出来的一个线程而已。
executor运行task都属于同一spout或者bolt。
task 执行实际上的任务处理。或者是Spout或者是bolt。

1.5 工作流程

  1. nimbus等待提交的Topology
  2. 提交Topology后,nimbus收集task,
  3. nimbus分发task给所有可用的supervisor
  4. supervisor周期性发送心跳给nimbus表示自己还活着。
  5. 如果supervisor挂掉,不会发送心跳给nimubs,nimbus将task发送给其他的supervisor
  6. nimubs挂掉,super会继续执行自己task。
  7. task完成后,supervisor等待新的task
  8. 同时,挂掉的nimbus可以通过监控工具软件自动重启。

二、集群部署

  1. 下载 apache-storm-2.1.0.tar.gz

  2. 安装、配置环境变量

    Code
    $tar -zxvf apache-storm-2.1.0.tar.gz -C /soft
    [wbw@s201 /soft]$ln -s apache-storm-2.1.0 storm

    # 环境变量
    # storm
    export STORM_HOME=/soft/storm
    export PATH=$PATH:$STORM_HOME/bin
  3. 验证安装

    Code
    [wbw@s201 /soft/storm/bin]$./storm version
    Storm 2.1.0
  4. 配置 [storm/conf/storm.yaml]

    先拷贝一份

    Code
    $cp storm.yaml storm.yaml.bak

    配置

    yaml
    storm.zookeeper.servers:
    - "s201"
    - "s202"
    - "s203"

    nimbus.seeds: ["s201"]

    ui.host: 0.0.0.0
    ui.port: 8080

    supervisor.slots.prots:
    - 6700
    - 6701
    - 6702
    - 6703
  5. 分发到S202~S204,并配置好环境变量

    Code
    $scp -r apache-storm-2.1.0 wbw@s202:/soft/
    $scp -r apache-storm-2.1.0 wbw@s203:/soft/
    $scp -r apache-storm-2.1.0 wbw@s204:/soft/
  6. 启动进程

    Code
    # 启动s201 nimbus进程
    $storm nimbus &

    # 启动S202~S204 supervisor进程
    $storm supervisor &

    # 启动s201的UI进程
    $storm ui &

    # 启动日志进程(在WEBUI中可以查看log)
    $storm logviewer &
  7. 通过webui查看

    Code
    http://s201:8080

三、单词计数案例

3.1 导入依赖

xml
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
</dependency>

3.2 编写Spout数据源

java
package cn.wangbowen.storm.wordcount;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;

/**
* WordSpout class
*
* @author BoWenWang
* @date 2020/4/19 16:14
*/
public class WordSpout implements IRichSpout {

// 上下文
private TopologyContext context;
// 输出收集器
private SpoutOutputCollector collector;
// 随机发生器
private Random randomGenerator = new Random();
private Integer idx = 0;


@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.context = topologyContext;
this.collector = spoutOutputCollector;
}

@Override
public void close() {

}

@Override
public void activate() {

}

@Override
public void deactivate() {

}

/**
* 产生数据,下一个元组,运行的时候,是不断执行这个方法的。
*/
@Override
public void nextTuple() {
if (this.idx++ < 100) {
List<String> lines = new ArrayList<>();
lines.add("hello world");
lines.add("hello tom");
lines.add("hello storm");

int i = randomGenerator.nextInt(3);
// 输出元组
this.collector.emit(new Values(lines.get(i)));
}
}

@Override
public void ack(Object o) {

}

@Override
public void fail(Object o) {

}

/**
* 声明输出数据(对应上面的输出元组)
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}

@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}

3.3 编写单词分割bolt

java
package cn.wangbowen.storm.wordcount;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
* WordCreatorBolt class
*
* @author BoWenWang
* @date 2020/4/19 16:28
*/
public class WordCreatorBolt implements IRichBolt {

private OutputCollector collector;

@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}

/**
* 处理传来的元组
*/
@Override
public void execute(Tuple tuple) {
// 根据下表获取元组对应位置的数据
String line = tuple.getString(0);
String[] words = line.split(" ");
for (String word : words) {
// 产生新的tuple交给下一个blot
this.collector.emit(new Values(word, 1));
}
}


@Override
public void cleanup() {

}

/**
* 声明字段
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word", "count"));
}

@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}

3.4 编写单词计数bolt

java
package cn.wangbowen.storm.wordcount;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

/**
* WordCounterBolt class
*
* @author BoWenWang
* @date 2020/4/19 16:35
*/
public class WordCounterBolt implements IRichBolt {

private OutputCollector collector;
// Map用来输出结果
Map<String, Integer> counterMap;

@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
this.counterMap = new HashMap<>(16);
}

/**
* 计算
*/
@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
Integer count = tuple.getInteger(1);
if (!counterMap.containsKey(word)) {
counterMap.put(word, 1);
} else {
counterMap.put(word, counterMap.get(word) + 1);
}
}

/**
* 处理完时,被执行的方法,但是一旦开始就一直在运行。除非kill掉,会执行
*/
@Override
public void cleanup() {
for (Map.Entry<String, Integer> entry : counterMap.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}

@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}

3.5 编写APP主类

java
public class App {
public static void main(String[] args) throws Exception {
// Topology 属性设置
TopologyBuilder builder = new TopologyBuilder();
// 设置Spout
builder.setSpout("spout", new WordSpout(), 3);
// 设置creatorBolt(这里的shuffleGrouping对应上面spout的id)
builder.setBolt("creatorBolt", new WordCreatorBolt(), 3).shuffleGrouping("spout");
// 设置counterBlot(这里的fieldsGrouping对应上一个bolt的id,并以其中一个字段作为分组条件)
builder.setBolt("counterBolt", new WordCounterBolt()).fieldsGrouping("creatorBolt", new Fields("word"));


Config config = new Config();
config.setDebug(true);
// 本地集群模式
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("WordCountTopology", config, builder.createTopology());

// 停止集群查看结果
Thread.sleep(10000);
cluster.shutdown();
}
}

等待运行结束,查看结果:

Code
storm : 36
tom : 32
world : 32
hello : 100

3.6 集群模式

  1. 修改APP类

    java
    // 本地集群模式
    //LocalCluster cluster = new LocalCluster();
    //cluster.submitTopology("WordCountTopology", config, builder.createTopology());

    // 停止集群查看结果
    //Thread.sleep(10000);
    //cluster.shutdown();

    // 集群模式
    StormSubmitter.submitTopology("WordCountTopology", config, builder.createTopology());
  2. 导出JAR包

  3. 在Storm节点上运行

    Code
    [wbw@s202 /home/wbw/tmp]$storm jar Storm-1.0-SNAPSHOT.jar cn.wangbowen.storm.wordcount.App
  4. 查看WEIUI

    avatar

四、并发度

并发度 == 所有的task个数的总和。即,N个task运行在M个executor上,executor又在K个worker中。

4.1 设置 worker 个数

java
// setNumWorkers
Config config = new Config();
config.setNumWorkers(2);

4.2 设置 executors 个数

java
// 设置Spout的并发暗示,第三个参数即executor个数(线程数)
builder.setSpout("spout", new WordSpout(), 3);
// 设置bolt的并发暗示
builder.setBolt("creatorBolt", new WordCreatorBolt(), 3).shuffleGrouping("spout");

线程数 == cpu的内核数。

4.3 设置 task 个数

java
// setNumTasks
builder.setBolt("creatorBolt", new WordCreatorBolt(), 3).shuffleGrouping("spout").setNumTasks(3);

五、分组

5.1 all(暂无)

5.2 direct(暂无)

5.3 global(暂无)

5.4 自定义

  1. 自定义CustomStreamGrouping类

    java
    /**
    * 自定义分组
    */
    public class MyGrouping implements CustomStreamGrouping {

    //接受目标任务的id集合
    private List<Integer> targetTasks ;

    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
    this.targetTasks = targetTasks ;
    }

    public List<Integer> chooseTasks(int taskId, List<Object> values) {
    List<Integer> subTaskIds = new ArrayList<Integer>();
    for(int i = 0 ; i <= targetTasks.size() / 2 ; i ++){
    subTaskIds.add(targetTasks.get(i));
    }
    return subTaskIds;
    }
    }
  2. 设置分组策略

    java
    public class App {
    public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    //设置Spout
    builder.setSpout("wcspout", new WordCountSpout()).setNumTasks(2);
    //设置creator-Bolt(这里设置自己的分组)
    builder.setBolt("split-bolt", new SplitBolt(),4).customGrouping("wcspout",new MyGrouping()).setNumTasks(4);

    Config conf = new Config();
    conf.setNumWorkers(2);
    conf.setDebug(true);

    /**
    * 本地模式storm
    */
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("wc", conf, builder.createTopology());
    System.out.println("hello world");
    }
    }

六、消息消费

6.1 ack、fail

  1. 发送的tuple需要携带msgId

    java
    // index 为消息ID
    collector.emit(new Values(line),index);
  2. bolt中需要对tuple进行确认(ack() | fail())

    java
    public void execute(Tuple tuple) {
    String line = tuple.getString(0);
    System.out.println(this + " : " + line);
    if(new Random().nextBoolean()){
    //确认
    collector.ack(tuple);
    }
    else{
    //失败
    collector.fail(tuple);
    }
    }
  3. 实现spout的ack()和fail()方法

    java
    public void ack(Object msgId) {
    System.out.println(this + " : ack() : " + msgId);
    }

    public void fail(Object msgId) {
    System.out.println(this + " : fail() : " + msgId);
    }

6.2 确保消息消费

用一个队列来存放消息队列,如果失败再fail函数里面进行重发。N次后丢弃。

七、整合 Kafka

storm以消费者从kafka队列中提取消息。

步骤

  1. 添加依赖

    xml
    <!-- 整合kafka -->
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.0</version>
    <exclusions>
    <exclusion>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    </exclusion>
    <exclusion>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    </exclusion>
    </exclusions>
    </dependency>

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.0</version>
    </dependency>

    <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
    <version>1.2.2</version>
    </dependency>
  2. 编写Blot类

    java
    package cn.wangbowen.storm.kafka;

    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Tuple;

    import java.util.Map;

    /**
    * MyBlot class
    *
    * @author BoWenWang
    * @date 2020/4/21 14:01
    */
    public class MyBlot implements IRichBolt {

    private TopologyContext context;
    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    this.context = topologyContext;
    this.collector = outputCollector;
    }

    /**
    * 打印消息
    */
    @Override
    public void execute(Tuple tuple) {
    System.out.println(tuple.getString(0));
    }

    @Override
    public void cleanup() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
    return null;
    }
    }
  3. 编写App类

    java
    package cn.wangbowen.storm.kafka;

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.kafka.*;
    import org.apache.storm.spout.SchemeAsMultiScheme;
    import org.apache.storm.topology.TopologyBuilder;

    import java.util.UUID;

    /**
    * App class
    *
    * @author BoWenWang
    * @date 2020/4/21 13:51
    */
    public class App {
    public static void main(String[] args) throws Exception {
    // zookeeper连接串
    String zkConnString = "192.168.174.201:2181,192.168.174.202:2181,192.168.174.203:2181";
    // 连接broker
    BrokerHosts hosts = new ZkHosts(zkConnString);

    // kafka主题
    String topicName = "test4";
    // 准备spout配置
    SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    // kafkaSpout
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

    // 准备topology
    TopologyBuilder builder = new TopologyBuilder();
    // 设置选择spout
    builder.setSpout("kafkaSpout", kafkaSpout);
    // 设置选择bolt
    builder.setBolt("myBolt", new MyBlot()).shuffleGrouping("kafkaSpout");
    // 本地执行
    LocalCluster cluster = new LocalCluster();
    Config config = new Config();
    cluster.submitTopology("storm-kafka", config, builder.createTopology());
    }
    }
  4. 启动ZK、Kafka、storm集群、运行App方法

  5. 使用kafka控制台生产者发送消息给kafka

    Code
    bin/kafka-console-producer.sh --broker-list s202:9092 --topic test4
    >hello
  6. 看storm控制台是否打印结果(消费)

八、整合 HBase

  1. 添加依赖

    xml
    <!-- 整合HBase -->
    <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-hbase</artifactId>
    <version>1.1.3</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.2.6</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.7.6</version>
    </dependency>
  2. 编写数据源Spout类(引用上面的单词案例)

    java
    package cn.wangbowen.storm.hbase;

    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichSpout;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;

    /**
    * WordSpout class
    *
    * @author BoWenWang
    * @date 2020/4/19 16:14
    */
    public class WordSpout implements IRichSpout {

    // 上下文
    private TopologyContext context;
    // 输出收集器
    private SpoutOutputCollector collector;
    // 随机发生器
    private Random randomGenerator = new Random();
    private Integer idx = 0;


    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
    this.context = topologyContext;
    this.collector = spoutOutputCollector;
    }

    @Override
    public void close() {

    }

    @Override
    public void activate() {

    }

    @Override
    public void deactivate() {

    }

    /**
    * 产生数据,下一个元组,运行的时候,是不断执行这个方法的。
    */
    @Override
    public void nextTuple() {
    if (this.idx++ < 100) {
    List<String> lines = new ArrayList<>();
    lines.add("hello world");
    lines.add("hello tom");
    lines.add("hello storm");

    int i = randomGenerator.nextInt(3);
    // 输出元组
    this.collector.emit(new Values(lines.get(i)));
    }
    }

    @Override
    public void ack(Object o) {

    }

    @Override
    public void fail(Object o) {

    }

    /**
    * 声明输出数据(对应上面的输出元组)
    */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("line"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
    return null;
    }
    }
  3. 编写切分Blot类

    java
    package cn.wangbowen.storm.hbase;

    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;

    import java.util.Map;

    /**
    * SplitBolt class
    *
    * @author BoWenWang
    * @date 2020/4/19 16:28
    */
    public class SplitBolt implements IRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    this.collector = outputCollector;
    }

    /**
    * 处理传来的元组
    */
    @Override
    public void execute(Tuple tuple) {
    // 根据下表获取元组对应位置的数据
    String line = tuple.getString(0);
    String[] words = line.split(" ");
    for (String word : words) {
    // 产生新的tuple交给下一个blot
    this.collector.emit(new Values(word, 1));
    }
    }


    @Override
    public void cleanup() {

    }

    /**
    * 声明字段
    */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("word", "count"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
    return null;
    }
    }
  4. 编写HbaseBlot类

    java
    package cn.wangbowen.storm.hbase;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Tuple;

    import java.io.IOException;
    import java.util.Map;

    /**
    * HBaseBolt class
    *
    * @author BoWenWang
    * @date 2020/4/21 16:35
    */
    public class HBaseBolt implements IRichBolt {

    private Table table;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    try {
    // 创建conf对象
    Configuration conf = HBaseConfiguration.create();
    // 通过连接工厂创建连接对象
    Connection conn = ConnectionFactory.createConnection(conf);
    // 获得表名对象
    TableName tableName = TableName.valueOf("ns1:word_count");
    // 获得table
    table = conn.getTable(tableName);

    } catch (IOException e) {
    e.printStackTrace();
    }

    }

    /**
    * 获取单词-个数,增量到HBase
    */
    @Override
    public void execute(Tuple tuple) {
    String word = tuple.getString(0);
    Integer count = tuple.getInteger(1);

    byte[] rowKey = Bytes.toBytes(word);
    byte[] f = Bytes.toBytes("f1");
    byte[] c = Bytes.toBytes("count");
    try {
    table.incrementColumnValue(rowKey, f, c, count);
    } catch (IOException e) {
    e.printStackTrace();
    }
    }


    @Override
    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
    return null;
    }
    }
  5. 编写App类

    java
    package cn.wangbowen.storm.hbase;

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.topology.TopologyBuilder;

    /**
    * App class
    *
    * @author BoWenWang
    * @date 2020/4/21 15:45
    */
    public class App {
    public static void main(String[] args) {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("wcSpout", new WordSpout());
    builder.setBolt("splitBlot", new SplitBolt()).shuffleGrouping("wcSpout");
    builder.setBolt("hbaseBlot", new HBaseBolt()).shuffleGrouping("splitBlot");

    Config config = new Config();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("wc-Hbase", config, builder.createTopology());
    }
    }
  6. hbase-site.xmlhdfs-site.xml 配置文件放到 resources 下

  7. 启动ZK、Storm、hadoop、Hbase集群

  8. 创建表 ns1:word_count

    Code
    $hbase shell
    $hbase shell>create 'ns1:word_count' , 'f1'
  9. 运行App(可能要等一会)

  10. 查看结果

    Code
    hbase(main):078:0> scan 'ns1:word_count'
    ROW COLUMN+CELL
    hello column=f1:count, timestamp=1587461666807, value=\x00\x00\x00\x00\x00\x00\x00d
    storm column=f1:count, timestamp=1587461666817, value=\x00\x00\x00\x00\x00\x00\x00!
    tom column=f1:count, timestamp=1587461666795, value=\x00\x00\x00\x00\x00\x00\x00\x1E
    world column=f1:count, timestamp=1587461666803, value=\x00\x00\x00\x00\x00\x00\x00%
    4 row(s) in 0.0130 seconds

    利用之前的HBase JAVA API 查看

    java
    /**
    * 查看数据
    */
    @Test
    public void get() throws IOException {
    // 创建conf对象
    Configuration conf = HBaseConfiguration.create();
    // 通过连接工厂创建连接对象
    Connection conn = ConnectionFactory.createConnection(conf);
    // 获得表名对象
    TableName tableName = TableName.valueOf("ns1:word_count");
    // 获得table
    Table table = conn.getTable(tableName);
    // 通过bytes工具类创建字节数组(将字符串)
    byte[] rowId = Bytes.toBytes("hello");
    // 创建get对象
    Get get = new Get(rowId);
    Result r = table.get(get);
    byte[] idValue = r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("count"));
    System.out.println(Bytes.toLong(idValue));
    }

    打印结果:

    Code
    100
文章作者: IT小王
文章链接: https://wangbowen.cn/2020/04/18/Storm%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 IT小王

评论