Storm学习笔记
一、简介
1.1 特点
Apache Storm是一个免费的开源分布式实时计算系统。
Apache Storm使得可靠处理无限数据流变得容易,实时处理就像Hadoop批处理一样。
Apache Storm很简单,可以与任何编程语言一起使用,并且使用起来很有趣!
Apache Storm有许多用例:实时分析,在线机器学习,连续计算,分布式RPC,ETL等。
Apache Storm速度很快:基准测试表明它每秒可处理每个节点超过一百万个元组。它具有可扩展性,容错性,可确保您的数据将得到处理,并且易于设置和操作。
Apache Storm与您已经使用的排队和数据库技术集成。
Apache Storm拓扑会消耗数据流,并以任意复杂的方式处理这些流,但是可以根据需要在计算的每个阶段之间重新分配流。在教程中阅读更多内容。
1.2 Storm对比Hadoop
Storm | Hadoop |
---|---|
实时流处理 | 批处理 |
无状态 | 有状态 |
使用zk协同的主从架构 | 无zk的主从结构 |
每秒处理数万消息 | HDFS MR数分钟、数小时 |
不会主动停止 | 终有完成的时候 |
1.3 核心概念
名称 | 介绍 |
---|---|
Tuple | 主要的数据结构,有序元素的列表。 |
Stream | Tuple的序列。 |
Spouts | 数据流源头。可以读取kafka队列消息。可以自定义。 |
Bolts | 转接头、逻辑处理单元。 spout的数据传递个bolt,bolt计算,完成后产生新的数据。 IBolt是核心接口。 |
Topology | Spout + bolt连接在一起形成一个Topology,形成有向图,定点就是计算,边是数据流。 |
task | Bolt中每个Spout或者bolt都是一个task。 |
1.4 架构
名称 | 介绍 |
---|---|
Nimbus | master节点。 核心组件,运行Topology。 分析Topology并收集运行task。分发task给supervisor. 监控Topology。 无状态,依靠zk监控Topology的运行状况。 |
Supervisor | 每个supervisor有n个worker进程,负责代理task给worker。 worker在孵化Executor线程最终运行task。 storm使用内部消息系统在nimbus和supervisor之间进行通信。 接受nimbus指令,管理worker进程完成task派发。 |
worker | 执行特定的task,worker本身不执行任务,而是孵化executors,让executors执行task。 |
Executor | 本质上由worker进程孵化出来的一个线程而已。 executor运行task都属于同一spout或者bolt。 |
task | 执行实际上的任务处理。或者是Spout或者是bolt。 |
1.5 工作流程
- nimbus等待提交的Topology
- 提交Topology后,nimbus收集task,
- nimbus分发task给所有可用的supervisor
- supervisor周期性发送心跳给nimbus表示自己还活着。
- 如果supervisor挂掉,不会发送心跳给nimubs,nimbus将task发送给其他的supervisor
- nimubs挂掉,super会继续执行自己task。
- task完成后,supervisor等待新的task
- 同时,挂掉的nimbus可以通过监控工具软件自动重启。
二、集群部署
下载 apache-storm-2.1.0.tar.gz
安装、配置环境变量
Code$tar -zxvf apache-storm-2.1.0.tar.gz -C /soft
[wbw@s201 /soft]$ln -s apache-storm-2.1.0 storm
# 环境变量
# storm
export STORM_HOME=/soft/storm
export PATH=$PATH:$STORM_HOME/bin验证安装
Code[wbw@s201 /soft/storm/bin]$./storm version
Storm 2.1.0配置 [storm/conf/storm.yaml]
先拷贝一份
Code$cp storm.yaml storm.yaml.bak
配置
yamlstorm.zookeeper.servers:
- "s201"
- "s202"
- "s203"
nimbus.seeds: ["s201"]
ui.host: 0.0.0.0
ui.port: 8080
supervisor.slots.prots:
- 6700
- 6701
- 6702
- 6703分发到S202~S204,并配置好环境变量
Code$scp -r apache-storm-2.1.0 wbw@s202:/soft/
$scp -r apache-storm-2.1.0 wbw@s203:/soft/
$scp -r apache-storm-2.1.0 wbw@s204:/soft/启动进程
Code# 启动s201 nimbus进程
$storm nimbus &
# 启动S202~S204 supervisor进程
$storm supervisor &
# 启动s201的UI进程
$storm ui &
# 启动日志进程(在WEBUI中可以查看log)
$storm logviewer &通过webui查看
Codehttp://s201:8080
三、单词计数案例
3.1 导入依赖
<dependency> |
3.2 编写Spout数据源
package cn.wangbowen.storm.wordcount; |
3.3 编写单词分割bolt
package cn.wangbowen.storm.wordcount; |
3.4 编写单词计数bolt
package cn.wangbowen.storm.wordcount; |
3.5 编写APP主类
public class App { |
等待运行结束,查看结果:
storm : 36 |
3.6 集群模式
修改APP类
java// 本地集群模式
//LocalCluster cluster = new LocalCluster();
//cluster.submitTopology("WordCountTopology", config, builder.createTopology());
// 停止集群查看结果
//Thread.sleep(10000);
//cluster.shutdown();
// 集群模式
StormSubmitter.submitTopology("WordCountTopology", config, builder.createTopology());导出JAR包
在Storm节点上运行
Code[wbw@s202 /home/wbw/tmp]$storm jar Storm-1.0-SNAPSHOT.jar cn.wangbowen.storm.wordcount.App
查看WEIUI
四、并发度
并发度 == 所有的task个数的总和。即,N个task运行在M个executor上,executor又在K个worker中。
4.1 设置 worker 个数
// setNumWorkers |
4.2 设置 executors 个数
// 设置Spout的并发暗示,第三个参数即executor个数(线程数) |
线程数 == cpu的内核数。
4.3 设置 task 个数
// setNumTasks |
五、分组
5.1 all(暂无)
5.2 direct(暂无)
5.3 global(暂无)
5.4 自定义
自定义CustomStreamGrouping类
java/**
* 自定义分组
*/
public class MyGrouping implements CustomStreamGrouping {
//接受目标任务的id集合
private List<Integer> targetTasks ;
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks ;
}
public List<Integer> chooseTasks(int taskId, List<Object> values) {
List<Integer> subTaskIds = new ArrayList<Integer>();
for(int i = 0 ; i <= targetTasks.size() / 2 ; i ++){
subTaskIds.add(targetTasks.get(i));
}
return subTaskIds;
}
}设置分组策略
javapublic class App {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
//设置Spout
builder.setSpout("wcspout", new WordCountSpout()).setNumTasks(2);
//设置creator-Bolt(这里设置自己的分组)
builder.setBolt("split-bolt", new SplitBolt(),4).customGrouping("wcspout",new MyGrouping()).setNumTasks(4);
Config conf = new Config();
conf.setNumWorkers(2);
conf.setDebug(true);
/**
* 本地模式storm
*/
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wc", conf, builder.createTopology());
System.out.println("hello world");
}
}
六、消息消费
6.1 ack、fail
发送的tuple需要携带msgId
java// index 为消息ID
collector.emit(new Values(line),index);bolt中需要对tuple进行确认(ack() | fail())
javapublic void execute(Tuple tuple) {
String line = tuple.getString(0);
System.out.println(this + " : " + line);
if(new Random().nextBoolean()){
//确认
collector.ack(tuple);
}
else{
//失败
collector.fail(tuple);
}
}实现spout的ack()和fail()方法
javapublic void ack(Object msgId) {
System.out.println(this + " : ack() : " + msgId);
}
public void fail(Object msgId) {
System.out.println(this + " : fail() : " + msgId);
}
6.2 确保消息消费
用一个队列来存放消息队列,如果失败再fail函数里面进行重发。N次后丢弃。
七、整合 Kafka
storm以消费者从kafka队列中提取消息。
步骤
添加依赖
xml<!-- 整合kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.2.2</version>
</dependency>编写Blot类
javapackage cn.wangbowen.storm.kafka;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import java.util.Map;
/**
* MyBlot class
*
* @author BoWenWang
* @date 2020/4/21 14:01
*/
public class MyBlot implements IRichBolt {
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.context = topologyContext;
this.collector = outputCollector;
}
/**
* 打印消息
*/
public void execute(Tuple tuple) {
System.out.println(tuple.getString(0));
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}编写App类
javapackage cn.wangbowen.storm.kafka;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.*;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import java.util.UUID;
/**
* App class
*
* @author BoWenWang
* @date 2020/4/21 13:51
*/
public class App {
public static void main(String[] args) throws Exception {
// zookeeper连接串
String zkConnString = "192.168.174.201:2181,192.168.174.202:2181,192.168.174.203:2181";
// 连接broker
BrokerHosts hosts = new ZkHosts(zkConnString);
// kafka主题
String topicName = "test4";
// 准备spout配置
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
// kafkaSpout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
// 准备topology
TopologyBuilder builder = new TopologyBuilder();
// 设置选择spout
builder.setSpout("kafkaSpout", kafkaSpout);
// 设置选择bolt
builder.setBolt("myBolt", new MyBlot()).shuffleGrouping("kafkaSpout");
// 本地执行
LocalCluster cluster = new LocalCluster();
Config config = new Config();
cluster.submitTopology("storm-kafka", config, builder.createTopology());
}
}启动ZK、Kafka、storm集群、运行App方法
使用kafka控制台生产者发送消息给kafka
Codebin/kafka-console-producer.sh --broker-list s202:9092 --topic test4
>hello看storm控制台是否打印结果(消费)
八、整合 HBase
添加依赖
xml<!-- 整合HBase -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.6</version>
</dependency>编写数据源Spout类(引用上面的单词案例)
javapackage cn.wangbowen.storm.hbase;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
* WordSpout class
*
* @author BoWenWang
* @date 2020/4/19 16:14
*/
public class WordSpout implements IRichSpout {
// 上下文
private TopologyContext context;
// 输出收集器
private SpoutOutputCollector collector;
// 随机发生器
private Random randomGenerator = new Random();
private Integer idx = 0;
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.context = topologyContext;
this.collector = spoutOutputCollector;
}
public void close() {
}
public void activate() {
}
public void deactivate() {
}
/**
* 产生数据,下一个元组,运行的时候,是不断执行这个方法的。
*/
public void nextTuple() {
if (this.idx++ < 100) {
List<String> lines = new ArrayList<>();
lines.add("hello world");
lines.add("hello tom");
lines.add("hello storm");
int i = randomGenerator.nextInt(3);
// 输出元组
this.collector.emit(new Values(lines.get(i)));
}
}
public void ack(Object o) {
}
public void fail(Object o) {
}
/**
* 声明输出数据(对应上面的输出元组)
*/
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}编写切分Blot类
javapackage cn.wangbowen.storm.hbase;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* SplitBolt class
*
* @author BoWenWang
* @date 2020/4/19 16:28
*/
public class SplitBolt implements IRichBolt {
private OutputCollector collector;
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
/**
* 处理传来的元组
*/
public void execute(Tuple tuple) {
// 根据下表获取元组对应位置的数据
String line = tuple.getString(0);
String[] words = line.split(" ");
for (String word : words) {
// 产生新的tuple交给下一个blot
this.collector.emit(new Values(word, 1));
}
}
public void cleanup() {
}
/**
* 声明字段
*/
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word", "count"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}编写HbaseBlot类
javapackage cn.wangbowen.storm.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import java.io.IOException;
import java.util.Map;
/**
* HBaseBolt class
*
* @author BoWenWang
* @date 2020/4/21 16:35
*/
public class HBaseBolt implements IRichBolt {
private Table table;
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
try {
// 创建conf对象
Configuration conf = HBaseConfiguration.create();
// 通过连接工厂创建连接对象
Connection conn = ConnectionFactory.createConnection(conf);
// 获得表名对象
TableName tableName = TableName.valueOf("ns1:word_count");
// 获得table
table = conn.getTable(tableName);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 获取单词-个数,增量到HBase
*/
public void execute(Tuple tuple) {
String word = tuple.getString(0);
Integer count = tuple.getInteger(1);
byte[] rowKey = Bytes.toBytes(word);
byte[] f = Bytes.toBytes("f1");
byte[] c = Bytes.toBytes("count");
try {
table.incrementColumnValue(rowKey, f, c, count);
} catch (IOException e) {
e.printStackTrace();
}
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}编写App类
javapackage cn.wangbowen.storm.hbase;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
/**
* App class
*
* @author BoWenWang
* @date 2020/4/21 15:45
*/
public class App {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("wcSpout", new WordSpout());
builder.setBolt("splitBlot", new SplitBolt()).shuffleGrouping("wcSpout");
builder.setBolt("hbaseBlot", new HBaseBolt()).shuffleGrouping("splitBlot");
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wc-Hbase", config, builder.createTopology());
}
}将 hbase-site.xml 和 hdfs-site.xml 配置文件放到 resources 下
启动ZK、Storm、hadoop、Hbase集群
创建表 ns1:word_count
Code$hbase shell
$hbase shell>create 'ns1:word_count' , 'f1'运行App(可能要等一会)
查看结果
Codehbase(main):078:0> scan 'ns1:word_count'
ROW COLUMN+CELL
hello column=f1:count, timestamp=1587461666807, value=\x00\x00\x00\x00\x00\x00\x00d
storm column=f1:count, timestamp=1587461666817, value=\x00\x00\x00\x00\x00\x00\x00!
tom column=f1:count, timestamp=1587461666795, value=\x00\x00\x00\x00\x00\x00\x00\x1E
world column=f1:count, timestamp=1587461666803, value=\x00\x00\x00\x00\x00\x00\x00%
4 row(s) in 0.0130 seconds利用之前的HBase JAVA API 查看
java/**
* 查看数据
*/
public void get() throws IOException {
// 创建conf对象
Configuration conf = HBaseConfiguration.create();
// 通过连接工厂创建连接对象
Connection conn = ConnectionFactory.createConnection(conf);
// 获得表名对象
TableName tableName = TableName.valueOf("ns1:word_count");
// 获得table
Table table = conn.getTable(tableName);
// 通过bytes工具类创建字节数组(将字符串)
byte[] rowId = Bytes.toBytes("hello");
// 创建get对象
Get get = new Get(rowId);
Result r = table.get(get);
byte[] idValue = r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("count"));
System.out.println(Bytes.toLong(idValue));
}打印结果:
Code100