Flume学习笔记
一、简介
收集、移动、聚合大量日志数据的服务。
基于流数据的架构,用于在线日志分析
基于事件。
在生产和消费者之间启动协调作用。
提供了事务保证,确保消息一定被分发。
Source:接受数据,类型有多种。
Channel:临时存放地,对Source中来的数据进行缓冲,直到sink消费掉。
Sink:从channel提取数据存放到中央化存储(hadoop / hbase)。
二、安装
下载 apache-flume-1.9.0-bin.tar.gz
安装到自定义目录下
Code$tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /soft/
$cd /soft/
$ln -s apache-flume-1.9.0-bin flume环境变量
Code$vi /etc/profile
# flume
export FLUME_HOME=/soft/flume
export PATH=$PATH:$FLUME_HOME/bin
$source /etc/profile修改配置文件env,添加JAVA_HOME
验证安装是否成功
Code$flume-ng version
Flume 1.9.0
三、使用文档
官方手册:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
安装nc用作测试工具:
$>sudo yum install nmap-ncat.x86_64 |
如果发现端口访问不了,可以把localhost改成0.0.0.0
3.1 Flume Source
数据收集。
3.1.1 netcat 监听端口
创建配置文件[/soft/flume/conf/netcat-conf.conf]
properties# 声明3种组件
r1 =
c1 =
k1 =
# 定义source信息
netcat =
localhost =
44444 =
# 定义sink信息
logger =
# 定义channel信息
memory =
# 绑定(1个sink只对应1个channel)
c1 =
c1 =运行
启动flume-agent
Code[wbw@s201 /soft/flume/conf]$flume-ng agent -f netcat-source.conf -n a1 -Dflume.root.logger=INFO,console
-f(–conf-file):配置文件
-n(–name):配置文件中的flume对象
–conf:配置文件目录
-Dflume.root.logger=INFO:使用log4j
console:打印到控制台
另起一个窗口,查看端口是否启动
Code$network -anop | grep 44444
启动nc客户端,发送信息
Code$>nc localhost 44444
Hi,flume!
OK
3.1.2 exec 监听文件
实时日志收集,对一个文件的监听。当文件有新的内容的时候,会触发事件。
配置文件[exec-source.conf]
properties# 声明3种组件
r1 =
c1 =
k1 =
# 定义source信息(文件末尾追加的时候触发)
exec =
tail -F /home/centos/flume-exec.txt =
# 定义sink信息
logger =
# 定义channel信息
memory =
# 绑定(1个sink只对应1个channel)
c1 =
c1 =运行
Code$flume-ng agent -f exec-source.conf -n a1 -Dflume.root.logger=INFO,console
$echo test exec >> /home/wbw/flume-exec.txt
$echo hello >> /home/wbw/flume-exec.txt
3.1.3 spooldir 监听目录
监控一个文件夹,静态文件。
收集完之后,会重命名文件成新文件。.compeleted
不能实时收集数据,要把文件移动进去。
配置文件[spooldir-source.conf]
properties# 声明3种组件
r1 =
c1 =
k1 =
# 定义source信息(目录下文件变化时触发事件)
spooldir =
/home/wbw/spooldir =
true =
# 定义sink信息
logger =
# 定义channel信息
memory =
# 绑定(1个sink只对应1个channel)
c1 =
c1 =运行
Code$mkdir /home/wbw/spooldir
$flume-ng agent -f spooldir-source.conf -n a1 -Dflume.root.logger=INFO,console
$echo hello >> /home/wbw/spooldir/sp1.txt
$echo spool >> /home/wbw/spooldir/sp2.txt
$echo hello >> /home/wbw/spooldir/sp3.txt
3.1.4 seq 生成文件(压力测试)
通过生成指定文件数量,进行压力测试。
配置文件[seq-source.conf]
# 声明3种组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
# 定义source信息(压力测试)
a1.sources.r1.type=seq
a1.sources.r1.totalEvent=1000 # 生成N个数据
# 定义sink信息
a1.sinks.k1.type=logger
# 定义channel信息
a1.channels.c1.type=memory
# 绑定(1个sink只对应1个channel)
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1运行
Code$flume-ng agent -f seq-source.conf -n a1 -Dflume.root.logger=INFO,console
3.1.5 stress (压力测试)
配置文件
Codea1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1
3.1.6 KafkaSource
作为消费者(Kafka => source)
配置文件
propertiesr1 =
k1 =
c1 =
# 配置kafka
org.apache.flume.source.kafka.KafkaSource =
100 =
2000 =
# 配置服务
s202:9092 =
# 主题
test4 =
# 组
g4 =
logger =
memory =
c1 =
c1 =运行
$flume-ng agent -f kafka-source.conf -n a1 -Dflume.root.logger=INFO,console
# 运行kafka生产者 JAVA API,发送 "This is Kafka"
# flume控制台打印消息
2020-04-17 19:06:01,645 INFO sink.LoggerSink: Event: { headers:{topic=test4, partition=0, offset=4, timestamp=1587121557922} body: 54 68 69 73 20 69 73 20 4B 61 66 6B 61 This is Kafka
3.1.7 Log4j
配置文件 log4j-source.conf
properties# 声明3种组件
r1 =
c1 =
k1 =
# 定义source信息
avro =
0.0.0.0 =
44444 =
# 定义sink信息
logger =
# 定义channel信息
memory =
# 绑定(1个sink只对应1个channel)
c1 =
c1 =编写java代码,生成日志
log4j.properties
properties### set log levels ###
INFO,Console,flume =
### 输出到控制台 ###
org.apache.log4j.ConsoleAppender =
System.out =
org.apache.log4j.PatternLayout =
%d{yyyy-MM-dd HH:mm:ss,SSS}] [%t] [%c] [%p] - %m%n =
# 输出到flume
org.apache.flume.clients.log4jappender.Log4jAppender =
s201 =
44444 =
true =LoggerGenerator.java
javaimport org.apache.log4j.Logger;
/**
* LoggerGenerator class
*
* @author BoWenWang
* @date 2020/5/29 12:08
*/
public class LoggerGenerator {
private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());
public static void main(String[] args) throws InterruptedException {
int index = 0;
while (true) {
Thread.sleep(1000);
logger.info("value : " + index++);
}
}
}
运行
Codeflume-ng agent -f log4j-source.conf -n a1 -Dflume.root.logger=INFO,console
3.2 Flume Sink
指定将数据存到哪。
3.2.1 hdfs
配置文件
properties# 声明3种组件
r1 =
c1 =
k1 =
# 定义source信息
netcat =
localhost =
44444 =
# 定义sink信息
hdfs =
/flume/events/%y-%m-%d/%H%M/%S =
events- =
true =
# 是否产生新目录,每10分钟产生一个新的目录,一般控制的目录方面
true =
10 =
minute =
# 是否产生新文件,当文件达到一定大小后,会生成新文件,单位字节
10 =
10 =
3 =
# 定义channel信息
memory =
# 绑定(1个sink只对应1个channel)
c1 =
c1 =运行
Code$flume-ng agent -f hdfs-sink.conf -n a1
# hdfs创建目录
$hdfs dfs -mkdir /flume
# 启动nc
$nc localhost 44444
hello,hdfs
OK
3.2.2 hvie
很少用,hive要转mr太慢了。
3.2.3 hbase
配置文件
properties# 声明3种组件
r1 =
c1 =
k1 =
# 定义source信息
netcat =
localhost =
44444 =
# 定义sink信息
hbase =
ns1:logger =
f1 =
org.apache.flume.sink.hbase.RegexHbaseEventSerializer =
# 定义channel信息
memory =
# 绑定(1个sink只对应1个channel)
c1 =
c1 =运行
Code# 启动hbase集群,创建表
$start-hbase.sh
$hbase shell
hbase(main):001:0> create 'ns1:logger','f1'
# 启动flume
$flume-ng agent -f hbase-sink.conf -n a1
# 启动nc
$nc localhost 44444
hello,hbase
OK
3.2.4 kafka
作为生产者(sink => Kafka)
配置文件
propertiesr1 =
k1 =
c1 =
netcat =
localhost =
44444 =
# 配置KafkaSink
org.apache.flume.sink.kafka.KafkaSink =
# 配置主题
test4 =
# 配置服务
s202:9092 =
# 配置大小
20 =
# 配置应答
1 =
memory =
c1 =
c1 =运行
# 运行flume
$flume-ng agent -f kafka-sink.conf -n a1
# 运行kafka消费者 JAVA API
# 启动nc
$nc localhost 44444
hello,kafka
OK
# JAVA API控制台接收到消息
offset = 0, value = hello,kafka
3.3 avro 跃点(Sink->Source)
使用avroSource和AvroSink实现跃点agent处理(从一个a1.Sink到一个a2.Source)
配置文件
properties# a1(相当于客户端)
# 声明3种组件
r1 =
c1 =
k1 =
# 定义source信息
netcat =
localhost =
44444 =
# 定义sink信息(作为a2的source)
avro =
localhost =
55555 =
# 定义channel信息
memory =
# 绑定(1个sink只对应1个channel)
c1 =
c1 =
# a2(当相于服务端)
# 声明3种组件
r2 =
c2 =
k2 =
# 定义source信息(avro)
avro =
localhost =
55555 =
# 定义sink信息
logger =
# 定义channel信息
memory =
# 绑定(1个sink只对应1个channel)
c2 =
c2 =启动并验证a2
Code$flume-ng agent -f avro.conf -n a2 -Dflume.root.logger=INFO,console
$netstat -lnpt | grep 55555
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6 0 0 127.0.0.1:55555 :::* LISTEN 60646/java启动并验证a1
Code$flume-ng agent -f avro.conf -n a1
$netstat -lnpt | grep 44444
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6 0 0 127.0.0.1:44444 :::* LISTEN 62129/javanc发送数据给a1
Code$nc localhost 44444
hello,avro!
OK
3.4 Flume Channel
通道是事件在代理上进行的存储库。源添加事件,接收器将其删除。
3.4.1 Memory Channel 内存通道
有缺点,如果出问题。通道中的数据就丢了。
# 和之前的配置那样 |
3.4.2 File Channel 文件通道
通道中的文件不会丢,但是占空间。
配置文件
propertiesr1 =
k1 =
c1 =
netcat =
localhost =
44444 =
logger =
file =
/home/centos/flume/fc_check =
/home/wbw/flume/fc_data =
c1 =
c1 =
3.4.3 Spillable Memory Channel 可溢出文件通道
事件存储在内存队列中和磁盘上。内存中队列充当主存储,磁盘充当溢出。使用嵌入式文件通道管理磁盘存储。当内存中队列已满时,其他传入事件将存储在文件通道中。该通道非常适合在正常操作期间需要高存储通道吞吐量的流,但同时又需要更大的文件通道容量,以更好地容忍间歇性接收器侧中断或排水速率下降。在这种异常情况下,吞吐量将大约降低到文件通道速度。如果代理崩溃或重新启动,则当代理联机时,只会恢复磁盘上存储的事件。该频道目前处于实验阶段,不建议在生产中使用。
配置文件
propertiesc1 =
SPILLABLEMEMORY =
#0表示禁用内存通道,等价于文件通道
0 =
#0,禁用文件通道,等价内存通道。
2000 =
800000 =
/user/centos/flume/fc_check =
/user/centos/flume/fc_data =
3.4.4 KafkaChannel 数据库通道
生产者 + 消费者(source => channel => sink)
r1 = |
四、JAVA API
4.1 pom 依赖
<dependency> |
其他组件,导入相应依赖。