avatar

目录
Flume学习笔记

Flume学习笔记

一、简介

  • 收集、移动、聚合大量日志数据的服务。

  • 基于流数据的架构,用于在线日志分析

  • 基于事件。

  • 在生产和消费者之间启动协调作用。

  • 提供了事务保证,确保消息一定被分发。

avatar

Source:接受数据,类型有多种。

Channel:临时存放地,对Source中来的数据进行缓冲,直到sink消费掉。

Sink:从channel提取数据存放到中央化存储(hadoop / hbase)。

二、安装

  1. 下载 apache-flume-1.9.0-bin.tar.gz

  2. 安装到自定义目录下

    Code
    $tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /soft/
    $cd /soft/
    $ln -s apache-flume-1.9.0-bin flume
  3. 环境变量

    Code
    $vi /etc/profile
    # flume
    export FLUME_HOME=/soft/flume
    export PATH=$PATH:$FLUME_HOME/bin

    $source /etc/profile
  4. 修改配置文件env,添加JAVA_HOME

  5. 验证安装是否成功

    Code
    $flume-ng version
    Flume 1.9.0

三、使用文档

官方手册http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html

安装nc用作测试工具:

Code
$>sudo yum install nmap-ncat.x86_64

如果发现端口访问不了,可以把localhost改成0.0.0.0

3.1 Flume Source

数据收集。

3.1.1 netcat 监听端口

  1. 创建配置文件[/soft/flume/conf/netcat-conf.conf]

    properties
    # 声明3种组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1

    # 定义source信息
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=localhost
    a1.sources.r1.port=44444

    # 定义sink信息
    a1.sinks.k1.type=logger

    # 定义channel信息
    a1.channels.c1.type=memory

    # 绑定(1个sink只对应1个channel)
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
  2. 运行

    • 启动flume-agent

      Code
      [wbw@s201 /soft/flume/conf]$flume-ng agent -f netcat-source.conf -n a1 -Dflume.root.logger=INFO,console

      -f(–conf-file):配置文件

      -n(–name):配置文件中的flume对象

      –conf:配置文件目录

      -Dflume.root.logger=INFO:使用log4j

      console:打印到控制台

    • 另起一个窗口,查看端口是否启动

      Code
      $network -anop | grep 44444
    • 启动nc客户端,发送信息

      Code
       $>nc localhost 44444
      Hi,flume!
      OK

      avatar

3.1.2 exec 监听文件

实时日志收集,对一个文件的监听。当文件有新的内容的时候,会触发事件。

  1. 配置文件[exec-source.conf]

    properties
    # 声明3种组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1

    # 定义source信息(文件末尾追加的时候触发)
    a1.sources.r1.type=exec
    a1.sources.r1.command=tail -F /home/centos/flume-exec.txt

    # 定义sink信息
    a1.sinks.k1.type=logger

    # 定义channel信息
    a1.channels.c1.type=memory

    # 绑定(1个sink只对应1个channel)
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
  2. 运行

    Code
    $flume-ng agent -f exec-source.conf -n a1 -Dflume.root.logger=INFO,console

    $echo test exec >> /home/wbw/flume-exec.txt
    $echo hello >> /home/wbw/flume-exec.txt

    avatar

3.1.3 spooldir 监听目录

监控一个文件夹,静态文件。

收集完之后,会重命名文件成新文件。.compeleted

不能实时收集数据,要把文件移动进去。

  1. 配置文件[spooldir-source.conf]

    properties
    # 声明3种组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1

    # 定义source信息(目录下文件变化时触发事件)
    a1.sources.r1.type=spooldir
    a1.sources.r1.spoolDir=/home/wbw/spooldir
    a1.sources.r1.fileHeader=true

    # 定义sink信息
    a1.sinks.k1.type=logger

    # 定义channel信息
    a1.channels.c1.type=memory

    # 绑定(1个sink只对应1个channel)
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
  2. 运行

    Code
    $mkdir /home/wbw/spooldir
    $flume-ng agent -f spooldir-source.conf -n a1 -Dflume.root.logger=INFO,console

    $echo hello >> /home/wbw/spooldir/sp1.txt
    $echo spool >> /home/wbw/spooldir/sp2.txt
    $echo hello >> /home/wbw/spooldir/sp3.txt

    avatar

    avatar

3.1.4 seq 生成文件(压力测试)

通过生成指定文件数量,进行压力测试。

  1. 配置文件[seq-source.conf]

    # 声明3种组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1

    # 定义source信息(压力测试)
    a1.sources.r1.type=seq
    a1.sources.r1.totalEvent=1000 # 生成N个数据

    # 定义sink信息
    a1.sinks.k1.type=logger

    # 定义channel信息
    a1.channels.c1.type=memory

    # 绑定(1个sink只对应1个channel)
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
  2. 运行

    Code
    $flume-ng agent -f seq-source.conf -n a1 -Dflume.root.logger=INFO,console

3.1.5 stress (压力测试)

  1. 配置文件

    Code
    a1.sources = stresssource-1
    a1.channels = memoryChannel-1
    a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
    a1.sources.stresssource-1.size = 10240
    a1.sources.stresssource-1.maxTotalEvents = 1000000
    a1.sources.stresssource-1.channels = memoryChannel-1

3.1.6 KafkaSource

作为消费者(Kafka => source)

  1. 配置文件

    properties
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置kafka
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 100
    a1.sources.r1.batchDurationMillis = 2000
    # 配置服务
    a1.sources.r1.kafka.bootstrap.servers = s202:9092
    # 主题
    a1.sources.r1.kafka.topics = test4
    # 组
    a1.sources.r1.kafka.consumer.group.id = g4

    a1.sinks.k1.type = logger

    a1.channels.c1.type=memory

    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  2. 运行

    $flume-ng agent -f kafka-source.conf -n a1 -Dflume.root.logger=INFO,console

    # 运行kafka生产者 JAVA API,发送 "This is Kafka"

    # flume控制台打印消息
    2020-04-17 19:06:01,645 INFO sink.LoggerSink: Event: { headers:{topic=test4, partition=0, offset=4, timestamp=1587121557922} body: 54 68 69 73 20 69 73 20 4B 61 66 6B 61 This is Kafka

3.1.7 Log4j

  1. 配置文件 log4j-source.conf

    properties
    # 声明3种组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1

    # 定义source信息
    a1.sources.r1.type=avro
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=44444

    # 定义sink信息
    a1.sinks.k1.type=logger

    # 定义channel信息
    a1.channels.c1.type=memory

    # 绑定(1个sink只对应1个channel)
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
  2. 编写java代码,生成日志

    • log4j.properties

      properties
      ### set log levels ###
      log4j.rootLogger = INFO,Console,flume

      ### 输出到控制台 ###
      log4j.appender.Console=org.apache.log4j.ConsoleAppender
      log4j.appender.Console.Target=System.out
      log4j.appender.Console.layout=org.apache.log4j.PatternLayout
      log4j.appender.Console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}] [%t] [%c] [%p] - %m%n

      # 输出到flume
      log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
      log4j.appender.flume.Hostname = s201
      log4j.appender.flume.Port = 44444
      log4j.appender.flume.UnsafeMode = true
    • LoggerGenerator.java

      java
      import org.apache.log4j.Logger;

      /**
      * LoggerGenerator class
      *
      * @author BoWenWang
      * @date 2020/5/29 12:08
      */
      public class LoggerGenerator {

      private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());


      public static void main(String[] args) throws InterruptedException {
      int index = 0;
      while (true) {
      Thread.sleep(1000);
      logger.info("value : " + index++);
      }
      }
      }
  3. 运行

    Code
    flume-ng agent -f log4j-source.conf -n a1 -Dflume.root.logger=INFO,console

3.2 Flume Sink

指定将数据存到哪。

3.2.1 hdfs

  1. 配置文件

    properties
    # 声明3种组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1

    # 定义source信息
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=localhost
    a1.sources.r1.port=44444

    # 定义sink信息
    a1.sinks.k1.type=hdfs
    a1.sinks.k1.hdfs.path=/flume/events/%y-%m-%d/%H%M/%S
    a1.sinks.k1.hdfs.filePrefix=events-
    a1.sinks.k1.hdfs.useLocalTimeStamp=true

    # 是否产生新目录,每10分钟产生一个新的目录,一般控制的目录方面
    a1.sinks.k1.hdfs.round=true
    a1.sinks.k1.hdfs.roundValue=10
    a1.sinks.k1.hdfs.roundUnit=minute

    # 是否产生新文件,当文件达到一定大小后,会生成新文件,单位字节
    a1.sinks.k1.hdfs.rollnterval=10
    a1.sinks.k1.hdfs.rollSize=10
    a1.sinks.k1.hdfs.rollCount=3

    # 定义channel信息
    a1.channels.c1.type=memory

    # 绑定(1个sink只对应1个channel)
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
  2. 运行

    Code
    $flume-ng agent -f hdfs-sink.conf -n a1

    # hdfs创建目录
    $hdfs dfs -mkdir /flume

    # 启动nc
    $nc localhost 44444
    hello,hdfs
    OK

    avatar

    avatar

3.2.2 hvie

很少用,hive要转mr太慢了。

3.2.3 hbase

  1. 配置文件

    properties
    # 声明3种组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1

    # 定义source信息
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=localhost
    a1.sources.r1.port=44444

    # 定义sink信息
    a1.sinks.k1.type=hbase
    a1.sinks.k1.table=ns1:logger
    a1.sinks.k1.columnFamily=f1
    a1.sinks.k1.serializer=org.apache.flume.sink.hbase.RegexHbaseEventSerializer

    # 定义channel信息
    a1.channels.c1.type=memory

    # 绑定(1个sink只对应1个channel)
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
  2. 运行

    Code
    # 启动hbase集群,创建表
    $start-hbase.sh
    $hbase shell
    hbase(main):001:0> create 'ns1:logger','f1'

    # 启动flume
    $flume-ng agent -f hbase-sink.conf -n a1

    # 启动nc
    $nc localhost 44444
    hello,hbase
    OK

    avatar

3.2.4 kafka

作为生产者(sink => Kafka)

  1. 配置文件

    properties
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    a1.sources.r1.type=netcat
    a1.sources.r1.bind=localhost
    a1.sources.r1.port=44444

    # 配置KafkaSink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    # 配置主题
    a1.sinks.k1.kafka.topic = test4
    # 配置服务
    a1.sinks.k1.kafka.bootstrap.servers = s202:9092
    # 配置大小
    a1.sinks.k1.kafka.flumeBatchSize = 20
    # 配置应答
    a1.sinks.k1.kafka.producer.acks = 1

    a1.channels.c1.type=memory

    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  2. 运行

    # 运行flume
    $flume-ng agent -f kafka-sink.conf -n a1

    # 运行kafka消费者 JAVA API

    # 启动nc
    $nc localhost 44444
    hello,kafka
    OK

    # JAVA API控制台接收到消息
    offset = 0, value = hello,kafka

3.3 avro 跃点(Sink->Source)

使用avroSourceAvroSink实现跃点agent处理(从一个a1.Sink到一个a2.Source)

  1. 配置文件

    properties
    # a1(相当于客户端)
    # 声明3种组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1

    # 定义source信息
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=localhost
    a1.sources.r1.port=44444

    # 定义sink信息(作为a2的source)
    a1.sinks.k1.type=avro
    a1.sinks.k1.hostname=localhost
    a1.sinks.k1.port=55555

    # 定义channel信息
    a1.channels.c1.type=memory

    # 绑定(1个sink只对应1个channel)
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1

    # a2(当相于服务端)
    # 声明3种组件
    a2.sources=r2
    a2.channels=c2
    a2.sinks=k2

    # 定义source信息(avro)
    a2.sources.r2.type=avro
    a2.sources.r2.bind=localhost
    a2.sources.r2.port=55555

    # 定义sink信息
    a2.sinks.k2.type=logger

    # 定义channel信息
    a2.channels.c2.type=memory

    # 绑定(1个sink只对应1个channel)
    a2.sources.r2.channels=c2
    a2.sinks.k2.channel=c2
  2. 启动并验证a2

    Code
    $flume-ng agent -f avro.conf -n a2 -Dflume.root.logger=INFO,console

    $netstat -lnpt | grep 55555
    (Not all processes could be identified, non-owned process info
    will not be shown, you would have to be root to see it all.)
    tcp6 0 0 127.0.0.1:55555 :::* LISTEN 60646/java
  3. 启动并验证a1

    Code
    $flume-ng agent -f avro.conf -n a1

    $netstat -lnpt | grep 44444
    (Not all processes could be identified, non-owned process info
    will not be shown, you would have to be root to see it all.)
    tcp6 0 0 127.0.0.1:44444 :::* LISTEN 62129/java
  4. nc发送数据给a1

    Code
    $nc localhost 44444
    hello,avro!
    OK

    avatar

3.4 Flume Channel

通道是事件在代理上进行的存储库。源添加事件,接收器将其删除。

3.4.1 Memory Channel 内存通道

有缺点,如果出问题。通道中的数据就丢了。

Code
# 和之前的配置那样
a2.channels.c2.type=memory

3.4.2 File Channel 文件通道

通道中的文件不会丢,但是占空间。

  1. 配置文件

    properties
    a1.sources = r1
    a1.sinks= k1
    a1.channels = c1

    a1.sources.r1.type=netcat
    a1.sources.r1.bind=localhost
    a1.sources.r1.port=44444

    a1.sinks.k1.type=logger

    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /home/centos/flume/fc_check
    a1.channels.c1.dataDirs = /home/wbw/flume/fc_data

    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1

3.4.3 Spillable Memory Channel 可溢出文件通道

事件存储在内存队列中和磁盘上。内存中队列充当主存储,磁盘充当溢出。使用嵌入式文件通道管理磁盘存储。当内存中队列已满时,其他传入事件将存储在文件通道中。该通道非常适合在正常操作期间需要高存储通道吞吐量的流,但同时又需要更大的文件通道容量,以更好地容忍间歇性接收器侧中断或排水速率下降。在这种异常情况下,吞吐量将大约降低到文件通道速度。如果代理崩溃或重新启动,则当代理联机时,只会恢复磁盘上存储的事件。该频道目前处于实验阶段,不建议在生产中使用。

  1. 配置文件

    properties
    a1.channels = c1
    a1.channels.c1.type = SPILLABLEMEMORY
    #0表示禁用内存通道,等价于文件通道
    a1.channels.c1.memoryCapacity = 0
    #0,禁用文件通道,等价内存通道。
    a1.channels.c1.overflowCapacity = 2000

    a1.channels.c1.byteCapacity = 800000
    a1.channels.c1.checkpointDir = /user/centos/flume/fc_check
    a1.channels.c1.dataDirs = /user/centos/flume/fc_data

3.4.4 KafkaChannel 数据库通道

生产者 + 消费者(source => channel => sink)

properties
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

a1.sinks.k1.type = logger

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = s202:9092
a1.channels.c1.kafka.topic = test4
a1.channels.c1.kafka.consumer.group.id = g6

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

四、JAVA API

4.1 pom 依赖

xml
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>

其他组件,导入相应依赖。

文章作者: IT小王
文章链接: https://wangbowen.cn/2020/04/13/Flume%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 IT小王

评论