avatar

目录
Kafka学习笔记

Kafka学习笔记

官网http://kafka.apache.org

一、简介

  • 分布式流处理平台。

  • 在系统之间构建实时数据流管道。

  • 以topic分类对记录进行存储。

  • 每个记录包含key-value+timestamp

  • 每秒钟百万消息吞吐量。

组成:

Code
producer			//消息生产者
consumer //消息消费者
consumer group //消费者组
kafka server //broker,kafka服务器
topic //主题,副本数,分区.
zookeeper //hadoop namenoade + RM HA | hbase | kafka

二、安装

这里我们规划s202-s203-s204作为Kafka集群。

  1. 下载 kafka_2.12-2.4.1.tgz 并上传到s202

  2. tar、创建连接

    Code
    $tar -zxvf kafka_2.12-2.4.1.tgz -C /soft/
    $ln -s kafka_2.12-2.4.1 kafka
  3. 配置环境变量

    Code
    [/etc/profile]
    # Kafka
    export KAFKA_HOME=/soft/kafka
    export PATH=$PATH:$KAFKA_HOME/bin

    $source /etc/profile
  4. 配置文件

    Code
    # 拷贝一份初始配置文件
    $cp server.properties server.properties.bak

    修改server.properties内容:

    properties
    # 设置ID,保证集群中唯一(这里取IP)
    broker.id=202
    # 打开注释
    listeners=PLAINTEXT://:9092
    # 修改日志目录
    log.dirs=/home/wbw/kafka/logs
    # 修改zookeeper集群
    zookeeper.connect=s201:2181,s202:2181,s203:2181
  5. 分发到s203,s204。注意修改broker.id

    Code
    $scp -r kafka_2.12-2.4.1 wbw@s203:/soft/
    $scp -r kafka_2.12-2.4.1 wbw@s204:/soft/

三、快速使用

3.1 启动Kafka服务器

  1. 确保ZK集群开启

  2. 启动Kafka,在S202~S204分别运行

    Code
    bin/kafka-server-start.sh -daemon config/server.properties

    -daemon :以守护线程方式启动

3.2 创建主题

Code
[wbw@s202 /soft/kafka]$bin/kafka-topics.sh --create --zookeeper s201:2181,s202:2181,s203:2181 --replication-factor 2 --partitions 2 --topic test

–create:创建主题

–bootstrap-server:引导服务

–replication-factor:副本数

–partitions:分区数

–topic:主题名

3.3 查看主题列表、描述

列表

Code
[wbw@s202 /soft/kafka]$bin/kafka-topics.sh --list --zookeeper s202:2181
test

描述

Code
[wbw@s202 /soft/kafka]$bin/kafka-topics.sh --describe --zookeeper s202:2181 --topic test
Topic: test PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 204 Replicas: 204,202 Isr: 204,202
Topic: test Partition: 1 Leader: 202 Replicas: 202,203 Isr: 202,203

注:该主题有2个分区,每个分区有2个副本。其中副本,又有leader和follow之分。一个消息过来,会被分配到一个分区中,所有副本上拥有该分区的主机都会复制一份。具体过程再副本中讲到。

3.4 启动控制台生产者

启动后,发送一些消息。

Code
[wbw@s202 /soft/kafka]$bin/kafka-console-producer.sh --broker-list s202:9092 --topic test
>
>;
>hello kafka
>This

3.5 启动控制台消费者

接受来自生产者的消息。

Code
[wbw@s203 /soft/kafka_2.12-2.4.1]$bin/kafka-console-consumer.sh --bootstrap-server s202:9092 --topic test --from-beginning

;
hello kafka
This

3.6 删除主题

Code
$kafka-topics.sh --delete --zookeeper s203:2181 --topic test

Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
删除topic,需要在server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

四、副本模式

  • broker存放消息以消息达到顺序存放。生产和消费都是副本感知的。

  • 支持到n-1故障。每个分区都有leader,follow.

  • leader挂掉时,消息分区写入到本地log或者,向生产者发送消息确认回执之前,生产者向新的leader发送消息。

  • 新leader的选举是通过isr进行,第一个注册的follower成为leader。

4.1 同步复制

  1. 生产者producer先联系zk找到leader
  2. 向leader发送消息
  3. leader收到消息后写入到本地log中
  4. 然后follower向leader,pull消息
  5. follower拿到消息后写入本地log
  6. 然后向leader发送ACK消息
  7. leader收到所有ACK消息后向producer回传ACK

4.2 异步复制

和同步复制的区别在于leader写入到本地log后,直接向client回传ACK消息,不需要等待所有follower复制完成,可能造成消息丢失

五、JAVA API

5.1 添加依赖

xml
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.4.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>

5.2 生产者

java
/**
* 生产者
*/
@Test
public void testProducer() {

Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.174.202:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 创建生产者
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
// 发送数据
producer.send(new ProducerRecord<String, String>("test4", "This message from java API new"));
// 关闭
producer.close();

}

5.3 消费者

java
/**
* 消费者
*/
@Test
public void testConsumer() {

Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.174.202:9092");
// kafka一个组只能收到1个消息,不论组里有多少个消费者,只有其中1个可以接受消息。
// 因此如果需要每个消费者都接收到消息,可以每个消费者各处一组
properties.put("group.id", "group-1");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题列表
consumer.subscribe(Collections.singletonList("test4"));
// 消息轮询
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}
}
文章作者: IT小王
文章链接: https://wangbowen.cn/2020/04/15/Kafka%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 IT小王

评论