1. 1. Kafka技术探索
    1. 1.1. 第1章 Kafka入门
      1. 1.1.1. 1.1 概述
        1. 1.1.1.1. 1.1.1 初识Kafka
        2. 1.1.1.2. 1.1.2 消息队列
        3. 1.1.1.3. 1.1.3 生产者/消费者模式
        4. 1.1.1.4. 1.1.4 消息中间件对比
        5. 1.1.1.5. 1.1.5 ZooKeeper
      2. 1.1.2. 1.2 快速上手
        1. 1.1.2.1. 1.2.1 环境安装
        2. 1.1.2.2. 1.2.2 消息主题
          1. 1.1.2.2.1. 1.2.2.1 创建主题
          2. 1.1.2.2.2. 1.2.2.2 查询主题
          3. 1.1.2.2.3. 1.2.2.3 修改主题
          4. 1.1.2.2.4. 1.2.2.4 删除主题
        3. 1.1.2.3. 1.2.3 生产数据
          1. 1.1.2.3.1. 1.2.3.1 命令行操作
          2. 1.1.2.3.2. 1.2.3.2 工具操作
          3. 1.1.2.3.3. 1.2.3.3 Java API操作
        4. 1.1.2.4. 1.2.4 消费数据
          1. 1.1.2.4.1. 1.2.4.1 命令行操作
          2. 1.1.2.4.2. 1.2.4.2 Java API操作
    2. 1.2. 第2章 Kafka基础
      1. 1.2.1. 2.1 集群部署
      2. 1.2.2. 2.2 集群启动
        1. 1.2.2.1. 2.2.1 相关概念
          1. 1.2.2.1.1. 2.2.1.1 代理Broker
          2. 1.2.2.1.2. 2.2.1.2 控制器Controller
        2. 1.2.2.2. 2.2.2 启动ZooKeeper
        3. 1.2.2.3. 2.2.3 启动Kafka
          1. 1.2.2.3.1. 2.2.3.1 初始化ZooKeeper
          2. 1.2.2.3.2. 2.2.3.2 初始化服务
          3. 1.2.2.3.3. 2.2.3.3 启动控制器
      3. 1.2.3. 2.3 创建主题
        1. 1.2.3.1. 2.3.1 相关概念
          1. 1.2.3.1.1. 2.3.1.1 主题Topic
          2. 1.2.3.1.2. 2.3.1.2 分区Partition
          3. 1.2.3.1.3. 2.3.1.3 副本Replication
          4. 1.2.3.1.4. 2.3.1.4 副本类型Leader/Follower
          5. 1.2.3.1.5. 2.3.1.5 日志Log
        2. 1.2.3.2. 2.3.2 创建第一个主题
          1. 1.2.3.2.1. 2.3.2.1 ZooKeeper节点变化
          2. 1.2.3.2.2. 2.3.2.2 数据存储位置
        3. 1.2.3.3. 2.3.3 创建第二个主题
          1. 1.2.3.3.1. 2.3.3.1 ZooKeeper节点变化
          2. 1.2.3.3.2. 2.3.3.2 数据存储位置
        4. 1.2.3.4. 2.3.4 创建第三个主题
          1. 1.2.3.4.1. 2.3.4.1 ZooKeeper节点变化
          2. 1.2.3.4.2. 2.3.4.2 数据存储位置
        5. 1.2.3.5. 2.3.5 创建主题流程
          1. 1.2.3.5.1. 2.3.5.1 命令行提交创建指令
          2. 1.2.3.5.2. 2.3.5.2 Controller接收创建主题请求
          3. 1.2.3.5.3. 2.3.5.3 创建主题
      4. 1.2.4. 2.4 生产消息
        1. 1.2.4.1. 2.4.1 生产消息的基本步骤
        2. 1.2.4.2. 2.4.2 生产消息的基本代码
        3. 1.2.4.3. 2.4.3 发送消息
          1. 1.2.4.3.1. 2.4.3.1 拦截器
          2. 1.2.4.3.2. 2.4.3.2 回调方法
          3. 1.2.4.3.3. 2.4.3.3 异步发送
          4. 1.2.4.3.4. 2.4.3.4 同步发送
        4. 1.2.4.4. 2.4.4 消息分区
          1. 1.2.4.4.1. 2.4.4.1 指定分区
          2. 1.2.4.4.2. 2.4.4.2 未指定分区
          3. 1.2.4.4.3. 2.4.4.3 分区器
        5. 1.2.4.5. 2.4.5 消息可靠性
          1. 1.2.4.5.1. 2.4.5.1 ACK=0
          2. 1.2.4.5.2. 2.4.5.2 ACK=1
          3. 1.2.4.5.3. 2.4.5.3 ACK=all
        6. 1.2.4.6. 2.4.6 消息去重&有序
          1. 1.2.4.6.1. 2.4.6.1 数据重试
          2. 1.2.4.6.2. 2.4.6.2 数据乱序
          3. 1.2.4.6.3. 2.4.6.3 数据幂等性
          4. 1.2.4.6.4. 2.4.6.4 数据事务
          5. 1.2.4.6.5. 2.4.6.5 数据传输语义
      5. 1.2.5. 2.5 存储消息
        1. 1.2.5.1. 2.5.1 存储组件
        2. 1.2.5.2. 2.5.2 数据存储
          1. 1.2.5.2.1. 2.5.2.1 ACKS校验
          2. 1.2.5.2.2. 2.5.2.2 内部主题校验
          3. 1.2.5.2.3. 2.5.2.3 ACKS应答及副本数量关系校验
          4. 1.2.5.2.4. 2.5.2.4 日志文件滚动判断
          5. 1.2.5.2.5. 2.5.2.5 请求数据重复性校验
          6. 1.2.5.2.6. 2.5.2.6 请求数据序列号校验
          7. 1.2.5.2.7. 2.5.2.7 数据存储
        3. 1.2.5.3. 2.5.3 存储文件格式
          1. 1.2.5.3.1. 2.5.3.1 数据日志文件
          2. 1.2.5.3.2. 2.5.3.2 数据索引文件
          3. 1.2.5.3.3. 2.5.3.3 数据时间戳索引文件
        4. 1.2.5.4. 2.5.4 数据刷写
        5. 1.2.5.5. 2.5.5 副本同步
          1. 1.2.5.5.1. 2.5.5.1 启动数据同步线程
          2. 1.2.5.5.2. 2.5.5.2 生成数据同步请求
          3. 1.2.5.5.3. 2.5.5.3 处理数据响应
          4. 1.2.5.5.4. 2.5.5.4 更新数据偏移量
        6. 1.2.5.6. 2.5.6 数据一致性
          1. 1.2.5.6.1. 2.5.6.1 数据一致性表现
          2. 1.2.5.6.2. 2.5.6.2 HW在副本之间的传递
          3. 1.2.5.6.3. 2.5.6.3 ISR(In-Sync-Replicas)伸缩
      6. 1.2.6. 2.6 消费消息
        1. 1.2.6.1. 2.6.1 消费消息的基本步骤
        2. 1.2.6.2. 2.6.2 消费消息的基本代码
        3. 1.2.6.3. 2.6.3 消费消息的基本原理
          1. 1.2.6.3.1. 2.6.3.1 消费者组
          2. 1.2.6.3.2. 2.6.3.2 组调度器GroupCoordinator
          3. 1.2.6.3.3. 2.6.3.3 消费者分配策略Assignor
          4. 1.2.6.3.4. 2.6.3.4 消费者偏移量
          5. 1.2.6.3.5. 2.6.3.5 消费者事务
          6. 1.2.6.3.6. 2.6.3.6 偏移量保存
          7. 1.2.6.3.7. 2.6.3.7 消费数据

Kafka技术探索

Kafka技术探索

第1章 Kafka入门

1.1 概述

1.1.1 初识Kafka

Kafka是一个由ScalaJava语言开发的,经典高吞吐量的分布式消息发布和订阅系统,也是大数据技术领域中用作数据交换的核心组件之一。以高吞吐,低延迟,高伸缩,高可靠性,高并发,且社区活跃度高等特性,从而备受广大技术组织的喜爱。

image-20240427135245141

因为备受技术组织的喜爱,2011年,Kafka软件被捐献给Apache基金会,并于7月被纳入Apache软件基金会孵化器项目进行孵化。2012年10月,Kafka从孵化器项目中毕业,转成Apache的顶级项目。由独立的消息日志传输系统转型为开源分布式事件流处理平台系统,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。

image-20240427134506417

1.1.2 消息队列

Kafka软件最初的设计就是专门用于数据传输的消息系统,类似功能的软件有RabbitMQ、ActiveMQ、RocketMQ等。这些软件名称中的MQ是英文单词Message Queue的简称,也就是所谓的消息队列的意思。这些软件的核心功能是传输数据,而Java中如果想要实现数据传输功能,那么这个软件一般需要遵循Java消息服务技术规范JMS(Java Message Service)

前面提到的ActiveMQ软件就完全遵循了JMS技术规范,而RabbitMQ是遵循了类似JMS规范并兼容JMS规范的跨平台的AMQP(Advanced Message Queuing Protocol)规范。Kafka拥有作为一个消息系统应该具备的功能,但是却有着独特的设计。可以这样说,Kafka借鉴了JMS规范的思想,但是却并没有完全遵循JMS规范。这也恰恰是软件名称为Kafka,而不是KafkaMQ的原因。

由上可知,无论学习哪一种消息传输系统,JMS规范都是大家应该首先了解的。所以咱们这里就对JMS规范做一个简单的介绍:

  • JMS是Java平台的消息中间件通用规范,定义了主要用于消息中间件的标准接口。如果不是很理解这个概念,可以简单地将JMS类比为Java和数据库之间的JDBC规范。Java应用程序根据JDBC规范中的接口访问关系型数据库,而每个关系型数据库厂商可以根据JDBC接口来实现具体的访问规则。JMS定义的就是系统和系统之间传输消息的接口。

  • 为了实现系统和系统之间的数据传输,JMS规范中定义很多用于通信的组件:

    image-20240427141415060

    • JMS Provider:JMS消息提供者。其实就是实现JMS接口和规范的消息中间件,也就是我们提供消息服务的软件系统,比如RabbitMQ、ActiveMQ、Kafka。
    • JMS Message:JMS消息。一般采用Java数据模型进行封装,其中包含消息头,消息属性和消息主体内容。
    • JMS Producer:JMS消息生产者。所谓的生产者,就是生产数据的客户端应用程序,这些应用通过JMS接口发送JMS消息。
    • JMS Consumer:JMS消息消费者。所谓的消费者,就是获取数据的客户端应用程序,这些应用通过JMS接口接收JMS消息。
  • JMS支持两种消息发送和接收模型:一种是点对点(Peer-to-Peer)模型,另外一种是发布订阅(Publish/Subscribe)模型。

    • 点对点模型:P2P模型是基于队列的,消息生产者将数据发送到消息队列中,消息消费者从消息队列中接收消息。因为队列的存在,消息的异步传输成为可能。P2P模型的规定就是每一个消息数据,只能有一个消费者,当发送者发送消息以后,不管接收者有没有运行都不影响消息发布到队列中。接收者在成功接收消息后会向发送者发送接收成功的消息。
    • 发布订阅模型:所谓得发布订阅模型就是事先将传输的数据进行分类,我们管这个数据的分类称之为主题(Topic)。也就是说,生产者发送消息时,会根据主题进行发送,感兴趣的消费者可以申请订阅特定的主题,然后从该主题中获取消息。这样,也就是说一个消息,是允许被多个消费者同时消费的。这里生产者向主题中发送消息,我们称之为发布消息,而消费者从主题中获取消息,我们就称之为订阅消息。Kafka采用就是这种模型。

1.1.3 生产者/消费者模式

生产者-消费者模式是通过一个数据容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个消息缓冲区,平衡了生产者和消费者的处理能力。在数据传输过程中,起到了一个削弱峰值的作用,也就是我们经常说到的削峰。

image-20240427142444917

图形中的缓冲区就是用来给生产者和消费者解耦的。在单点环境中,我们一般会采用阻塞式队列实现这个缓冲区。而在分布式环境中,一般会采用第三方软件实现缓冲区,这个第三方软件我们一般称之为中间件。纵观大多数应用场景,解耦合最常用的方式就是增加中间件。

1.1.4 消息中间件对比

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比RocketMQ,Kafka低一个数量级 万级,比RocketMQ,Kafka低一个数量级 10万级,支持高吞吐 10万级,支持高吞吐
Topic数量对吞吐量的影响 Topic可以达到几百/几千量级 Topic可以达到几百量级,如果更多的话,吞吐量会大幅度下降
时效性 ms级 微秒级别,延迟最低 ms级 ms级
可用性 高,基于主从架构实现高可用 高,基于主从架构实现高可用 非常高,分布式架构 非常高,分布式架构
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到0丢失 经过参数优化配置,可以做到0丢失
功能支持 MQ领域的功能极其完备 并发能力强,性能极好,延时很低 MQ功能较为完善,分布式,扩展性好 功能较为简单,支持简单的MQ功能,在大数据领域被广泛使用
其他 很早的软件,社区不是很活跃 开源,稳定,社区活跃度高 阿里开发,社区活跃度不高 开源,高吞吐量,社区活跃度极高

1.1.5 ZooKeeper

ZooKeeper是一个开源的分布式应用程序协调服务软件。在当前的软件开发中,多节点分布式的架构设计已经成为必然,那么如何保证架构中不同的节点所运行的环境,系统配置是相同的,就是一个非常重要的话题。一般情况下,我们会采用独立的第三方软件保存分布式系统中的全局环境信息以及系统配置信息,这样系统中的每一个节点在运行时就可以从第三方软件中获取一致的数据。也就是说通过这个第三方软件来协调分布式各个节点之间的环境以及配置信息。Kafka软件是一个分布式事件流处理平台系统,底层采用分布式的架构设计,就是说,也存在多个服务节点,多个节点之间Kafka就是采用ZooKeeper来实现协调调度的。

ZooKeeper的核心作用:

  • ZooKeeper的数据存储结构可以简单地理解为一个Tree结构,而Tree结构上的每一个节点可以用于存储数据,所以一般情况下,我们可以将分布式系统的元数据(环境信息以及系统配置信息)保存在ZooKeeper节点中。
  • ZooKeeper创建数据节点时,会根据业务场景创建临时节点或持久节点。持久节点就是无论客户端是否连接上ZooKeeper都一直存在的节点,而临时节点指的是客户端连接时创建,断开连接后删除的节点。同时,ZooKeeper也提供了Watch机制用于监控节点的变化,然后通知对应的客户端进行相应的变化。Kafka软件中就内置了ZooKeeper的客户端,用于进行ZooKeeper的连接和通信。

其实,Kafka作为一个独立的分布式消息传输系统,还需要第三方软件进行节点间的协调调度,不能实现自我管理,无形中就导致Kafka和其他软件之间形成了耦合性,制约了Kafka软件的发展,所以从Kafka 2.8.X版本开始,Kafka就尝试增加了Raft算法实现节点间的协调管理,来代替ZooKeeper。不过Kafka官方不推荐此方式应用在生产环境中,计划在Kafka 4.X版本中完全移除ZooKeeper,让我们拭目以待。

1.2 快速上手

1.2.1 环境安装

作为开源分布式事件流处理平台,Kafka分布式软件环境的安装相对比较复杂,不利于Kafka软件的入门学习和练习。所以我们这里先搭建相对比较简单的Windows单机环境,让初学者快速掌握软件的基本原理和用法,后面的课程中,我们再深入学习Kafka软件在生产环境中的安装和使用。

zookeeper.properties文件的配置中需要修改的部分:

image-20240427144257213

server.properties文件的配置中需要修改的部分:

image-20240427150835625

启动Kafka单节点集群后的效果:

image-20240427145051515

image-20240427145149328

1.2.2 消息主题

有很多种方式都可以操作Kafka消息中的主题(Topic):命令行、第三方工具、Java API、自动创建。而对于初学者来讲,掌握基本的命令行操作是必要的。所以接下来,我们采用命令行进行操作。

1.2.2.1 创建主题

image-20240427150542871

1.2.2.2 查询主题

image-20240427150632339

1.2.2.3 修改主题

image-20240427151020788

1.2.2.4 删除主题

image-20240427151411014

注意:Windows系统中由于权限或进程锁定的问题,删除topic会导致Kafka服务节点异常关闭。

1.2.3 生产数据

1.2.3.1 命令行操作

image-20240427152411088

1.2.3.2 工具操作

image-20240427152700617

image-20240427153003130

1.2.3.3 Java API操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class KafkaProducerTest {
public static void main(String[] args) {
// 配置连接信息和序列化器
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(configs);
for (int i = 0; i < 10; i++) {
// 创建消息对象
ProducerRecord<String, String> record = new ProducerRecord<>("test", "key" + i, "value" + i);
// 发送消息
kafkaProducer.send(record);
}
// 关闭生产者对象
kafkaProducer.close();
}
}

1.2.4 消费数据

1.2.4.1 命令行操作

image-20240427154442392

1.2.4.2 Java API操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class KafkaConsumerTest {
public static void main(String[] args) {
// 配置连接信息和反序列化器
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId1");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(configs);
// 订阅主题
kafkaConsumer.subscribe(Collections.singletonList("test"));
while (true) {
// 拉取消息
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
}
// 关闭消费者对象
// kafkaConsumer.close();
}
}

image-20240427155233130

第2章 Kafka基础

Kafka借鉴了JMS规范的思想,但是却并没有完全遵循JMS规范,因此从设计原理上,Kafka的内部也会有很多用于数据传输的组件对象,这些组件对象之间会形成关联,组合在一起实现高效的数据传输。所以接下来,我们就按照数据流转的过程详细讲一讲Kafka中的基础概念以及核心组件。

2.1 集群部署

生产环境都是采用Linux系统搭建服务器集群,但是我们的重点是在于学习Kafka的基础概念和核心组件,所以这里我们搭建一个简单易用的Windows集群方便大家的学习和练习。Linux集群的搭建会在第3章给大家进行讲解。

image-20240427155838274

三个Kafka节点分别是kafka-broker-1、kafka-broker-2、kafka-broker-3,分别修改其对应的server.properties配置文件:

  • broker.id分别是1、2、3

  • listeners分别是PLAINTEXT://:9091PLAINTEXT://:9092PLAINTEXT://:9093

  • log.dirs分别是D:/cluster/kafka-broker-1/data、D:/cluster/kafka-broker-2/data、D:/cluster/kafka-broker-3/data

  • zookeeper.connect均为localhost:21810

集群启动脚本cluster.cmd的内容为:

1
2
3
4
5
6
7
8
9
cd kafka-zookeeper
start zk.cmd
ping 127.0.0.1 -n 10 >nul
cd ../kafka-broker-1
start kafka.cmd
cd ../kafka-broker-2
start kafka.cmd
cd ../kafka-broker-3
start kafka.cmd

集群清理脚本cluster-clear.cmd的内容为:

1
2
3
4
5
6
7
8
cd kafka-zookeeper
rd /s /q data
cd ../kafka-broker-1
rd /s /q data
cd ../kafka-broker-2
rd /s /q data
cd ../kafka-broker-3
rd /s /q data

启动Kafka多节点集群后的效果:

image-20240427160617064

image-20240427160652321

2.2 集群启动

2.2.1 相关概念

2.2.1.1 代理Broker

使用Kafka前,我们都会启动Kafka服务进程,这里的Kafka服务进程我们一般会称之为Kafka Broker或Kafka Server。因为Kafka是分布式消息系统,所以在实际的生产环境中,是需要多个服务进程形成集群提供消息服务的。所以每一个服务节点都是一个broker,而且在Kafka集群中,为了区分不同的服务节点,每一个broker都应该有一个不重复的全局ID,称之为broker.id,这个ID可以在Kafka软件的配置文件server.properties中进行配置。

主机 kafka-broker1 kafka-broker2 kafka-broker3
broker.id 1 2 3
2.2.1.2 控制器Controller

Kafka是分布式消息传输系统,所以存在多个Broker服务节点,但是它的软件架构采用的是分布式系统中比较常见的主从架构,也就是说需要从多个Broker中找到一个用于管理整个Kafka集群的Master节点,这个节点,我们就称之为Controller。它是Apache Kafka的核心组件非常重要。它的主要作用是在Zookeeper的帮助下管理和协调控制整个Kafka集群

image-20240427162725190

如果在运行过程中,Controller节点出现了故障,那么Kafka会依托于ZooKeeper软件选举其他的节点作为新的Controller,让Kafka集群实现高可用。

Kafka集群中Controller的基本功能:

  • Broker管理(监听/brokers/ids节点相关的变化)

    • Broker数量增加或减少的变化

    image-20240427164621444

    • Broker对应的数据变化
  • Topic管理

    • 新增:监听/brokers/topics节点相关的变化

    image-20240427163934876

    • 修改:监听/brokers/topics节点相关的变化
    • 删除:监听/admin/delete_topics节点相关的变化

    image-20240427164134740

  • Partation管理

    • 监听/admin/reassign_partitions节点相关的变化
    • 监听/isr_change_notification节点相关的变化
    • 监听/preferred_replica_election节点相关的变化
  • 数据服务

  • 启动分区状态机和副本状态机

2.2.2 启动ZooKeeper

Kafka集群中含有多个服务节点,而分布式系统中经典的主从(Master-Slave)架构就要求从多个服务节点中找一个节点作为集群管理Master,Kafka集群中的这个Master,我们称之为集群控制器Controller。

image-20240427165117122

如果此时Controller节点出现故障,它就不能再管理集群功能,那么其他的Slave节点该如何是好呢?

image-20240427165133415

如果从剩余的2个Slave节点中选一个节点出来作为新的集群控制器是不是一个不错的方案,我们将这个选择的过程称之为:选举(elect)。方案是不错,但是问题就在于选哪一个Slave节点呢?不同的软件实现类似的选举功能都会有一些选举算法,而Kafka是依赖于ZooKeeper软件实现Broker节点选举功能。

image-20240427165200812

ZooKeeper如何实现Kafka的节点选举呢?这就要说到我们用到ZooKeeper的3个功能:

  • 一个是在ZooKeeper软件中创建节点Node,创建一个Node时,我们会设定这个节点是持久节点,还是临时节点。所谓的持久节点,就是Node一旦创建后会一直存在,而临时创建,是根据当前的客户端连接创建的临时节点,一旦客户端连接断开,那么这个临时节点也会被自动删除,所以这样的节点称之为临时节点。
  • ZooKeeper节点是不允许有重复的,所以多个客户端创建同一个节点,只能有一个创建成功。
  • 另外一个是客户端可以在ZooKeeper的节点上增加监听器,用于监听节点的状态变化,一旦监听的节点状态发生变化,那么监听器就会触发响应,实现特定监听功能。

有了上面的三个知识点,我们这里就介绍一下Kafka是如何利用ZooKeeper实现Controller节点的选举的:

  1. 第一次启动Kafka集群时,会同时启动多个Broker节点,每一个Broker节点就会连接ZooKeeper,并尝试创建一个临时节点 /controller
  2. 因为ZooKeeper中一个节点不允许重复创建,所以多个Broker节点,最终只能有一个Broker节点可以创建成功,那么这个创建成功的Broker节点就会自动作为Kafka集群控制器节点,用于管理整个Kafka集群。
  3. 没有选举成功的其他Slave节点会创建Node监听器,用于监听/controller节点的状态变化。
  4. 一旦Controller节点出现故障或挂掉了,那么对应的ZooKeeper客户端连接就会中断。ZooKeeper中的/controller节点就会自动被删除,而其他的那些Slave节点因为增加了监听器,所以当监听到/controller节点被删除后,就会马上向ZooKeeper发出创建 /controller节点的请求,一旦创建成功,那么该Broker就变成了新的Controller节点了。

image-20240427165716354

现在我们能明白启动Kafka集群之前,为什么要先启动ZooKeeper集群了吧。就是因为ZooKeeper可以协助Kafka进行集群管理。

2.2.3 启动Kafka

ZooKeeper已经启动好了,那我们现在可以启动多个Kafka Broker节点构建Kafka集群了。构建的过程中,每一个Broker节点就是一个Java进程,而在这个进程中,有很多需要提前准备好,并进行初始化的内部组件对象。

2.2.3.1 初始化ZooKeeper

Kafka Broker启动时,首先会先创建ZooKeeper客户端(KafkaZkClient),用于和ZooKeeper进行交互。客户端对象创建完成后,会通过该客户端对象向ZooKeeper发送创建Node的请求,注意,这里创建的Node都是持久化Node。

image-20240427170608831

节点 类型 说明
/admin/delete_topics 持久化节点 配置需要删除的topic,因为删除过程中,可能broker下线,或执行失败,那么就需要在broker重新上线后,根据当前节点继续删除操作,一旦topic所有的分区数据全部删除,那么当前节点的数据才会进行清理
/brokers/ids 持久化节点 服务节点ID标识,只要broker启动,那么就会在当前节点中增加子节点,brokerID不能重复
/brokers/topics 持久化节点 服务节点中的主题详细信息,包括分区,副本
/brokers/seqid 持久化节点 seqid主要用于自动生产brokerId
/config/changes 持久化节点 kafka的元数据发生变化时,会向该节点下创建子节点,并写入对应信息
/config/clients 持久化节点 客户端配置,默认为空
/config/brokers 持久化节点 服务节点相关配置,默认为空
/config/ips 持久化节点 IP配置,默认为空
/config/topics 持久化节点 主题配置,默认为空
/config/users 持久化节点 用户配置,默认为空
/consumers 持久化节点 消费者节点,用于记录消费者相关信息
/isr_change_notification 持久化节点 ISR列表发生变更时候的通知,在kafka当中由于存在ISR列表变更的情况发生,为了保证ISR列表更新的及时性,定义了isr_change_notification这个节点,主要用于通知Controller来及时将ISR列表进行变更。
/latest_producer_id_block 持久化节点 保存PID块,主要用于能够保证生产者的任意写入请求都能够得到响应。
/log_dir_event_notification 持久化节点 主要用于保存当broker当中某些数据路径出现异常时候,例如磁盘损坏,文件读写失败等异常时候,向ZooKeeper当中增加一个通知序号,Controller节点监听到这个节点的变化之后,就会做出对应的处理操作
/cluster/id 持久化节点 主要用于保存kafka集群的唯一id信息,每个kafka集群都会给分配要给唯一id,以及对应的版本号
2.2.3.2 初始化服务

Kafka Broker中有很多的服务对象,用于实现内部管理和外部通信操作。

image-20240427171118763

  • 创建任务调度器

每一个Broker在启动时都会创建内部调度器(KafkaScheduler)并启动,用于完成节点内部的工作任务。底层就是Java中的定时任务线程池ScheduledThreadPoolExecutor

  • 创建日志数据管理器

每一个Broker在启动时都会创建日志数据管理器(LogManager),用于接收到消息后,完成后续的数据创建,查询,清理等处理。

  • 创建远程数据管理器

每一个Broker在启动时都会创建远程数据管理器(RemoteLogManager),用于和其他Broker节点进行数据状态同步。

  • 创建副本管理器

每一个Broker在启动时都会创建副本管理器(ReplicaManager),用于对主题分区的副本进行处理。

  • 创建ZK元数据缓存

每一个Broker在启动时会将ZK的关于Kafka的元数据进行缓存,创建元数据对象(ZkMetadataCache)。

  • 创建Broker通信对象

每一个Broker在启动时会创建Broker之间的通道管理器对象(BrokerToControllerChannelManager),用于管理Broker和Controller之间的通信。

  • 创建网络通信对象

每一个Broker在启动时会创建自己的网络通信对象(SocketServer),用于和其他Broker之间的进行通信,其中包含了Java用于NIO通信的Channel、Selector对象。

image-20240427171932255
  • 注册Broker节点

Broker启动时,会通过ZKClient对象向ZooKeeper注册当前的Broker节点ID,注册后创建的ZK节点为临时节点。如果当前Broker的ZKClient断开和ZooKeeper的连接,注册的节点会被删除。

2.2.3.3 启动控制器

控制器(KafkaController)是每一个Broker启动时都会创建的核心对象,用于和ZooKeeper之间建立连接并申请自己为整个Kafka集群的Master管理者。如果申请成功,那么会完成管理者的初始化操作,并建立和其他Broker之间的数据通道接收各种事件,进行封装后交给事件管理器,并定义了process方法,用于真正处理各类事件。

image-20240427172253401

  • 初始化通道管理器

创建通道管理器(ControllerChannelManager),该管理器维护了Controller和集群所有Broker节点之间的网络连接,并向Broker发送控制类请求及接收响应。

  • 初始化事件管理器

创建事件管理器(ControllerEventManager)维护了Controller和集群所有Broker节点之间的网络连接,并向Broker发送控制类请求及接收响应。

  • 初始化状态管理器

创建状态管理器(ControllerChangeHandler)可以监听/controller节点的操作,一旦节点创建(ControllerChange),删除(Reelect),数据发生变化(ControllerChange),那么监听后执行相应的处理。

  • 启动控制器

控制器对象启动后,会向事件管理器发送Startup事件,事件处理线程接收到事件后会通过ZKClient向ZooKeeper申请/controller节点,申请成功后,执行当前节点成为Controller的一系列操作。主要是注册各类ZooKeeper监听器、删除日志路径变更和ISR副本变更通知事件、启动Controller通道管理器,以及启动副本状态机和分区状态机。

2.3 创建主题

Topic主题是Kafka中消息的逻辑分类,但是这个分类不应该是固定的,而是应该由外部的业务场景进行定义(注意:Kafka中其实是有两个固定的内部主题,用于记录消费者偏移量和事务处理的Topic),所以Kafka提供了相应的指令和客户端进行主题操作。

  • __consumer_offsets:这个内部Topic用于存储Kafka消费者的偏移量信息。每个消费者组都会在这个Topic中维护其消费的进度,以确保消费者可以从上次离开的地方继续消费消息。
  • __transaction_state:这个内部Topic用于支持Kafka事务。Kafka支持事务性生产和消费操作,__transaction_state用于存储事务的元数据信息。

通常,这些内部Topic不需要用户手动操作,Kafka会自动管理它们。它们对Kafka集群的正常运行非常重要,因此最好不要对它们进行修改或删除操作,以避免影响Kafka的正常功能。

2.3.1 相关概念

2.3.1.1 主题Topic

Kafka是分布式消息传输系统,采用的数据传输方式为发布订阅模式,也就是说由消息的生产者发布消息,消费者订阅消息后获取数据。为了对消费者订阅的消息进行区分,所以对消息在逻辑上进行了分类,这个分类我们称之为主题:Topic。为了防止主题的名称和监控指标的名称产生冲突,官方推荐主题的名称中不要同时包含下划线和点。

image-20240427180933897
2.3.1.2 分区Partition

Kafka消息传输采用发布订阅模式,所以消息生产者必须将数据发送到一个主题,假如发送给这个主题的数据非常多,那么主题所在Broker节点的负载和吞吐量就会受到极大的考验,甚至有可能因为热点问题引起Broker节点故障,导致服务不可用。一个好的方案就是将一个主题从物理上分成几部分,然后将消息均匀地分配到不同的Broker节点分区上,这样就可以缓解单节点的负载问题。默认情况下,主题创建时分区数量为1,也就是一块分区,可以指定参数--partitions改变。Kafka的分区解决了单一主题线性扩展的问题,也解决了负载均衡的问题。

主题的每个分区都会用一个编号进行标记,一般是从0开始的连续整数数字。Partition分区是物理上的概念,也就意味着会以数据文件的方式真实存在。每个Topic包含一个或多个Partition,每个Partition都可以看作一个有序队列。Partition中每条消息都会分配一个有序的ID,称之为偏移量:Offset

image-20240427181416380
2.3.1.3 副本Replication

分布式系统出现错误是比较常见的,只要保证集群内部依然存在可用的服务节点即可,当然效率会有所降低,不过只要能保证系统可用就可以了。咱们Kafka的Topic也存在类似的问题,也就是说,如果一个Topic划分了多个分区,那么这些分区就会均匀地分布在不同的Broker节点上,一旦某一个Broker节点出现了问题,那么在这个节点上的分区就会出现问题,那么Topic的数据就不完整了。所以一般情况下,为了防止出现数据丢失的情况,我们会给分区数据设定多个备份,这里的备份,我们称之为副本:Replication。

Kafka支持多副本,使得主题可以做到更多容错性,牺牲性能与空间去换取更高的可靠性。

image-20240427181647955

注意:这里不能将多个备份放置在同一个Broker中,因为一旦出现故障,多个副本就都不能用了,那么副本的意义就没有了。

2.3.1.4 副本类型Leader/Follower

假设我们有一份文件,一般情况下,我们对副本的理解应该是有一个正式的完整文件,然后这个文件的备份称之为副本。但是在Kafka中不是这样的,所有的文件都称之为副本,只不过会选择其中的一个文件作为主文件称之为Leader副本,其他的文件作为备份文件称之为Follower副本。在Kafka中,这里的文件就是分区,每一个分区都可以存在1个或多个副本,只有Leader副本才能进行数据的读写,Follower副本只做备份使用。

image-20240427182233007
2.3.1.5 日志Log

Kafka最开始的应用场景就是日志场景或MQ场景,更多的扮演着一个日志传输和存储系统,这是Kafka立家之本。所以Kafka接收到的消息数据最终都是存储在log日志文件中的,底层存储数据的文件的扩展名就是.log

image-20240427182803170

2.3.2 创建第一个主题

创建主题Topic的方式有很多种:命令行,工具,客户端API,自动创建。在server.properties文件中配置参数auto.create.topics.enable=true时,如果访问的主题不存在,那么Kafka就会自动创建主题,这个操作不在我们的讨论范围内。我们首先创建的主题,仅仅指明主题的名称即可,其他参数暂时无需设定。

image-20240427183549123

2.3.2.1 ZooKeeper节点变化

指令执行后,当前Kafka会增加一个主题,因为指令中没有配置分区和副本参数,所以当前主题默认分区数量为1,编号为0,副本数量为1。为了方便集群的管理,创建Topic时,会同时在ZooKeeper中增加子节点,记录主题相关配置信息:

  • /config/topics节点中会增加first-topic节点。

image-20240427183631376

  • /brokers/topics节点中会增加first-topic节点以及相应的子节点。

image-20240427183815758

2.3.2.2 数据存储位置

主题创建后,需要找到一个用于存储分区数据的位置,根据上面ZooKeeper存储的节点配置信息可以知道,当前主题的分区数量为1,副本数量为1,那么数据存储的位置就是副本所在的Broker节点,从当前数据来看,数据存储在我们的Broker3上。

image-20240427184341097

路径中的00000000000000000000.log文件就是真正存储消息数据的文件,文件名称中的0表示当前文件中第一个消息的起始偏移量为0,index文件和timeindex文件都是数据索引文件,用于快速定位数据。只不过index文件采用偏移量的方式进行定位,而timeindex是采用时间戳的方式。

查看的命令:kafka-run-class.sh kafka.tools.DumpLogSegments –files (000…..log|000……index) –print-data-log

2.3.3 创建第二个主题

接下来我们创建第二个主题,不过创建时,我们需要设定分区参数--partitions,参数值为3,表示创建3个分区

image-20240427185256204

2.3.3.1 ZooKeeper节点变化

指令执行后,当前Kafka会增加一个主题,因为指令中指定了分区数量(–partitions 3),所以当前主题分区数量为3,编号为[0、1、2],副本数量为1,编号为所在Broker的ID值。为了方便集群的管理,创建Topic时,会同时在ZooKeeper中增加子节点,记录主题相关配置信息:

  • /config/topics节点中会增加second-topic节点。

image-20240427185502734

  • /brokers/topics节点中会增加second-topic节点以及相应的子节点。 image-20240427185704383

image-20240427185646303

2.3.3.2 数据存储位置

主题创建后,需要找到一个用于存储分区数据的位置,根据上面ZooKeeper存储的节点配置信息可以知道,当前主题的分区数量为3,副本数量为1,那么数据存储的位置就是每个分区Leader副本所在的Broker节点。

image-20240427190121016

2.3.4 创建第三个主题

接下来我们创建第三个主题,不过创建时,我们需要设定副本参数--replication-factor,参数值为3,表示每个分区创建3个副本。

image-20240427213253128

2.3.4.1 ZooKeeper节点变化

指令执行后,当前Kafka会增加一个主题,因为指令中指定了分区数量和副本数量(--replication-factor 3),所以当前主题分区数量为3,编号为[0、1、2],副本为3,编号为[1、2、3]。为了方便集群的管理,创建Topic时,会同时在ZooKeeper中增加子节点,记录主题相关配置信息:

  • /config/topics节点中会增加third-topic节点。

image-20240427213530618

  • /brokers/topics节点中会增加third-topic节点以及相应的子节点。

image-20240427213729353

image-20240427213749881

2.3.4.2 数据存储位置

主题创建后,需要找到一个用于存储分区数据的位置,根据上面ZooKeeper存储的节点配置信息可以知道,当前主题的分区数量为3,副本数量为3,那么数据存储的位置就是每个分区副本所在的Broker节点。

image-20240427214156546

2.3.5 创建主题流程

Kafka中主题、分区以及副本的概念都和数据存储相关,所以是非常重要的。前面咱们演示了一下创建主题的具体操作和现象,那么接下来,我们就通过图解来了解一下Kafka是如何创建主题,并进行分区副本分配的。

2.3.5.1 命令行提交创建指令
image-20240427214315328
  1. 通过命令行提交指令,指令中会包含操作类型(--create)、Topic的名称(--topic)、主题分区数量(--partitions)、主题分区副本数量(--replication-facotr)、副本分配策略(--replica-assignment)等参数。
  2. 指令会提交到客户端进行处理,客户端获取指令后,会首先对指令参数进行校验。
    1. 操作类型取值:create、list、alter、describe、delete,只能存在一个。
    2. 分区数量为大于0的整数。
    3. 主题是否已经存在。
    4. 分区副本数量处于[1,Short.MaxValue],一般取值小于等于Broker数量。
  3. 将参数封装主题对象(NewTopic)。
  4. 创建通信对象,设定请求标记(CREATE_TOPICS),查找Controller,通过通信对象向Controller发起创建主题的网络请求。
2.3.5.2 Controller接收创建主题请求
image-20240427215442772
  1. Controller节点接收到网络请求(Acceptor),并将请求数据封装成请求对象放置在队列(requestQueue)中。
  2. 请求控制器(KafkaRequestHandler)周期性从队列中获取请求对象(BaseRequest)。
  3. 将请求对象转发给请求应用处理器(KafkaApis),根据请求对象的类型调用创建主题的方法。
2.3.5.3 创建主题
image-20240427215733055
  1. 应用请求处理器(KafkaApis)校验主题参数。

    1. 如果分区数量没有设置,那么会采用Kafka启动时加载的配置项:num.partitions(默认值为1)
    2. 如果副本数量没有设置,那么会采用Kafka启动时记载的配置项:default.replication.factor(默认值为1)
  2. 在创建主题时,如果使用了replica-assignment参数,那么就按照指定的方案来进行分区副本的创建;如果没有指定replica-assignment参数,那么就按照Kafka内部逻辑来分配,内部逻辑按照机架信息分为两种策略:【未指定机架信息】和【指定机架信息】。当前课程中采用的是【未指定机架信息】副本分配策略:

    1. 分区起始编号设置0;

    2. 轮询所有分区,计算每一个分区的所有副本位置:

      1. 副本起始索引 =(分区编号 + 随机值)% BrokerID列表长度
      2. 其他副本索引 = 随机值(基本算法为使用随机值执行多次模运算)
      3. 通过索引位置获取副本BrokerID
      4. 保存分区以及对应的副本BrokerID列表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      ##################################################################
      # 假设
      # 当前分区编号 : 0
      # BrokerID列表 :【1234
      # 副本数量 : 4
      # 随机值(BrokerID列表长度): 2
      # 副本分配间隔随机值(BrokerID列表长度): 2
      ##################################################################
      # 第一个副本索引:(分区编号 + 随机值)% BrokerID列表长度 =(0 + 2)% 4 = 2
      # 第一个副本所在BrokerID : 3

      # 第二个副本索引(第一个副本索引 + (1 +(副本分配间隔 + 0)% (BrokerID列表长度 - 1))) % BrokerID列表长度 =(2 +(1 +(2 + 0)% 3))% 4 = 1
      # 第二个副本所在BrokerID:2

      # 第三个副本索引:(第一个副本索引 + (1 +(副本分配间隔 + 1)% (BrokerID列表长度 - 1))) % BrokerID列表长度 =(2 +(1 +(2 + 1)% 3))% 4 = 3
      # 第三个副本所在BrokerID:4

      # 第四个副本索引:(第一个副本索引 + (1 +(副本分配间隔 + 2)% (BrokerID列表长度 - 1))) % BrokerID列表长度 = (2 +(1 +(2 + 2)% 3))% 4 = 0
      # 第四个副本所在BrokerID:1

      # 最终分区0的副本所在的Broker节点列表为【3241
      # 其他分区采用同样算法
    3. 通过ZK客户端在ZooKeeper端创建节点:

      1. /config/topics节点下,增加当前主题节点,节点类型为持久类型。
      2. /brokers/topics节点下,增加当前主题及相关节点,节点类型为持久类型。
    4. Controller节点启动后,会在/brokers/topics节点增加监听器,一旦节点发生变化,会触发相应的功能:

      1. 获取需要新增的主题信息
      2. 更新当前Controller节点保存的主题状态数据
      3. 更新分区状态机的状态为:NewPartition
      4. 更新副本状态机的状态:NewReplica
      5. 更新分区状态机的状态为:OnlinePartition,从正常的副本列表中的获取第一个作为分区的Leader副本,所有的副本作为分区的同步副本列表,我们称之为ISR(In-Sync Replica)。在ZK路径/brokers/topics/主题名上增加分区节点/partitions及状态/state节点。
      6. 更新副本状态机的状态:OnlineReplica
    5. Controller向主题的各个分区副本所属Broker节点发送LeaderAndIsrRequest请求,向所有的Broker节点发送UPDATE_METADATA请求,更新自身的缓存。

      1. Controller向分区所属的Broker发送请求
      2. Broker节点接收到请求后,根据分区状态信息,设定当前的副本为Leader或Follower,并创建底层的数据存储文件目录和空的数据文件。

      文件目录名:主题名 + 分区编号

      文件名 说明
      0000000000000000.log 数据文件,用于存储传输的数据
      0000000000000000.index 索引文件,用于定位数据
      0000000000000000.timeindex 时间戳索引文件,用于定位数据

2.4 生产消息

Topic主题已经创建好了,接下来我们就可以向该主题生产消息了,这里我们采用Java代码通过Kafka Producer API的方式生产数据。

2.4.1 生产消息的基本步骤

(一)创建Map类型的配置对象,根据场景增加相应的配置属性

  • bootstrap.servers
    • 用于建立与Kafka集群的初始连接的主机/端口对列表
    • 格式:host1:port1,host2:port2,…
  • key.serializer
    • 实现org.apache.kafka.common.serialization.Serializer接口的key的序列化器的全限定类名
  • value.serializer
    • 实现org.apache.kafka.common.serialization.Serializer接口的value的序列化器的全限定类名
  • interceptor.classes
    • 用作拦截器的类的列表,实现org.apache.kafka.clients.Producer.ProducerInterceptor接口允许您在将生产者收到的Record发布到Kafka集群之前拦截(并可能改变)它们
    • 默认情况下,没有拦截器
  • batch.size
    • 每当多个记录发送到同一分区时,生产者将尝试将记录一起批处理为更少的请求。这有助于提高客户端和服务器的性能。此配置控制默认批量大小(以字节为单位)
    • 发送到Broker的请求将包含多个批次,每个批次对应一个可发送数据的分区
    • 小批量大小将使批处理不太常见,并且可能会降低吞吐量(批量大小为零将完全禁用批处理)。非常大的批处理大小可能会更加浪费内存,因为我们总是会分配指定批处理大小的缓冲区以应对额外的记录
    • 注意:此设置给出了要发送的批量大小的上限。如果该分区累积的字节数少于这么多,我们将徘徊linger.ms时间,等待更多记录到来。linger.ms设置默认为0,这意味着即使累积的批量大小低于该batch.size设置,我们也会立即发送一条记录
  • linger.ms
    • 生产者将等待给定的延迟以允许发送其他记录,以便可以将发送分批在一起
  • acks
    • acks=0:如果设置为零,那么生产者将根本不会等待来自服务器的任何确认。该记录将立即添加到Socket缓冲区并被视为已发送。 在这种情况下,不能保证服务器已收到记录,并且重试配置不会生效(因为客户端通常不会知道任何失败)
    • acks=1:这意味着Leader会将记录写入其本地日志,但会在不等待所有Follower完全确认的情况下做出响应。在这种情况下,如果Leader在确认记录后但在Follower复制它之前立即失败,那么记录将丢失
    • acks=all:这意味着Leader将等待完整的同步副本集确认记录。这保证了只要至少一个同步副本保持活动状态,记录就不会丢失。 这是最强有力的保证。这等同于acks=-1
    • 启用幂等性要求此配置值为all。如果设置了冲突的配置并且未显式启用幂等性,则幂等性将被禁用
  • retries
    • 设置大于零的值将导致客户端重新发送发送失败并可能出现暂时性错误的任何记录。请注意,此重试与客户端在收到错误后重新发送记录没有什么不同。如果在成功确认之前,delivery.timeout.ms配置的超时时间先到期,则生产请求将在重试次数用尽之前失败。用户通常应该更愿意不设置此配置,而是使用delivery.timeout.ms来控制重试行为
    • 在将enable.idempotence设置为false并将max.in.flight.requests.per.connection设置为大于1时允许重试可能会更改记录的顺序,因为如果将两批发送到单个分区,并且第一批失败并重试 但是第二批成功了,那么第二批中的记录可能会先出现
    • 启用幂等性要求此配置值大于0。如果设置了冲突的配置并且未显式启用幂等性,则幂等性将被禁用
  • max.in.flight.requests.per.connection
    • 客户端在阻塞之前在单个连接上发送的未确认请求的最大数量。注意,如果该配置设置大于1且enable.idempotence设置为false,则存在因重试而导致发送失败后消息重新排序的风险(即如果启用了重试);如果禁用重试或将enable.idempotence设置为true,则将保留排序。
    • 启用幂等性要求此配置的值小于或等于5。如果设置了冲突的配置并且未显式启用幂等性,则幂等性将被禁用
  • buffer.memory
    • 生产者可用于缓冲等待发送到服务器的记录的内存总字节数。如果记录发送速度快于传送到服务器的速度,生产者将阻塞max.block.ms之后将抛出异常。
  • enable.idempotence
    • 当设置为true时,生产者将确保每条消息的一份副本准确写入流中。如果为false,则生产者由于Broker故障等原因重试,可能会在流中写入重试消息的重复项
  • partitioner.ignore.keys
    • 当设置为true时,生产者将不会使用记录Key来选择分区。如果为false,则当存在Key时,生产者将根据Key的哈希值选择分区。 注意:如果使用自定义分区器,此设置无效。

(二)创建待发送数据

在Kafka中传递的数据我们称之为消息(message)或记录(record),所以Kafka发送数据前,需要将待发送的数据封装为指定的数据模型:

image-20240428155755449

image-20240428155823107

相关属性必须在构建数据模型时指定,其中Topic和Value的值是必须要传递的。如果配置中开启了自动创建主题,那么Topic主题可以不存在。Value就是我们需要真正传递的数据了,而Key可以用于数据的分区定位。

(三)创建生产者对象,发送生产的数据

根据前面提供的配置信息创建生产者对象,通过这个生产者对象向Kafka服务器节点发送数据,而具体的发送是由生产者对象创建时,内部构建的多个组件实现的,多个组件的关系有点类似于生产者-消费者模式。

image-20240428160004995
  • 数据生产者(KafkaProducer):生产者对象,用于对我们的数据进行必要的转换和处理,将处理后的数据放入到数据收集器中,类似于生产者-消费者模式下的生产者。这里我们简单介绍一下内部的数据转换处理:
    • 如果配置拦截器(interceptor.classes),那么将数据进行拦截处理。某一个拦截器出现异常并不会影响后续的拦截器处理。
    • 因为发送的数据为KV数据,所以需要根据配置信息中的序列化器对数据中Key和Value分别进行序列化处理。
    • 计算数据所发送的分区位置。
    • 将数据追加到数据收集器中。
  • 数据收集器(RecordAccumulator):用于收集,转换我们产生的数据,类似于生产者-消费者模式下的缓冲区。为了优化数据的传输,Kafka并不是生产一条数据就向Broker发送一条数据,而是通过合并单条消息,进行批量(批次)发送,提高吞吐量,减少带宽消耗。
    • 默认情况下,一个发送批次的数据容量为16K,这个可以通过参数batch.size进行改善。
    • 批次是和分区进行绑定的。也就是说发往同一个分区的数据会进行合并,形成一个批次。
    • 如果当前批次能容纳数据,那么直接将数据追加到批次中即可,如果不能容纳数据,那么会产生新的批次放入到当前分区的批次队列中,这个队列使用的是Java的双端队列Deque。旧的批次关闭不再接收新的数据,等待发送。
  • 数据发送器(Sender):线程对象,用于从收集器对象中获取数据,向服务节点发送。类似于生产者-消费者模式下的消费者。因为是线程对象,所以启动后会不断轮询获取数据收集器中已经关闭的批次数据。对批次进行整合后再发送到Broker节点中。
    • 因为数据真正发送的地方是Broker节点,不是分区。所以需要将从数据收集器中收集到的批次数据按照可用Broker节点重新组合成List集合。
    • 将组合后的<节点,List<批次>>的数据封装成客户端请求(请求键为:Produce)发送到网络客户端对象的缓冲区,由网络客户端对象通过网络发送给Broker节点。
    • Broker节点获取客户端请求,并根据请求键进行后续的数据处理:向分区中增加数据。
image-20240428160717743

2.4.2 生产消息的基本代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class KafkaProducerTest {
public static void main(String[] args) {
// 配置连接信息和序列化器
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(configs);
for (int i = 0; i < 10; i++) {
// 创建消息对象
ProducerRecord<String, String> record = new ProducerRecord<>("test", "key" + i, "value" + i);
// 发送消息
kafkaProducer.send(record);
}
// 关闭生产者对象
kafkaProducer.close();
}
}

2.4.3 发送消息

2.4.3.1 拦截器

生产者API在数据准备好发送给Kafka服务器之前,允许我们对生产的数据进行统一的处理,比如校验,整合数据等等。这些处理我们是可以通过Kafka提供的拦截器完成。因为拦截器不是生产者必须配置的功能,所以大家可以根据实际的情况自行选择使用。

但是要注意,这里的拦截器是可以配置多个的。执行时,会按照声明顺序执行完一个后,再执行下一个。并且某一个拦截器如果出现异常,只会跳出当前拦截器逻辑,并不会影响后续拦截器的处理。所以开发时,需要将拦截器的这种处理方法考虑进去。

image-20240428161203206

image-20240428161301545

下面演示拦截器的基本使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<>(record.topic(), record.key(), record.value() + " intercepted");
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}

public class KafkaProducerInterceptorTest {
public static void main(String[] args) {
// 配置连接信息和序列化器
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 配置拦截器
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
// 创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(configs);
for (int i = 0; i < 10; i++) {
// 创建消息对象
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic1", "key" + i, "value" + i);
// 发送消息
kafkaProducer.send(record);
}
// 关闭生产者对象
kafkaProducer.close();
}
}

image-20240428161558461

2.4.3.2 回调方法

Kafka发送数据时,可以同时传递回调对象(Callback)用于对数据的发送结果进行对应处理,具体代码实现采用匿名类或Lambda表达式都可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class KafkaProducerCallbackTest {
public static void main(String[] args) throws Exception {
// 配置连接信息和序列化器
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(configs);
for (int i = 0; i < 10; i++) {
// 创建消息对象
ProducerRecord<String, String> record = new ProducerRecord<>("test", "key" + i, "value" + i);
// 异步发送消息
Future<RecordMetadata> result = kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println(Thread.currentThread().getName() + "数据发送成功:" + metadata);
}
});
System.out.println(Thread.currentThread().getName() + "发送数据:" + record);
}
// 关闭生产者对象
kafkaProducer.close();
}
}

image-20240428162939197

2.4.3.3 异步发送

Kafka发送数据时,底层的实现类似于生产者-消费者模式。对应的,底层会由主线程代码作为生产者向缓冲区中放数据,而数据发送线程会从缓冲区中获取数据进行发送。Broker接收到数据后进行后续处理。

如果Kafka通过主线程代码将一条数据放入到缓冲区后,无需等待数据的后续发送过程,就直接发送下一条数据,我们就称之为异步发送。

image-20240428162252090
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class KafkaProducerCallbackTest {
public static void main(String[] args) throws Exception {
// 配置连接信息和序列化器
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(configs);
for (int i = 0; i < 10; i++) {
// 创建消息对象
ProducerRecord<String, String> record = new ProducerRecord<>("test", "key" + i, "value" + i);
// 异步发送消息
Future<RecordMetadata> result = kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println(Thread.currentThread().getName() + "数据发送成功:" + metadata);
}
});
System.out.println(Thread.currentThread().getName() + "发送数据:" + record);
}
// 关闭生产者对象
kafkaProducer.close();
}
}

image-20240428163058286

2.4.3.4 同步发送

如果Kafka通过主线程代码将一条数据放入到缓冲区后,需等待数据的后续发送操作的应答状态,才能发送下一条数据的场合,我们就称之为同步发送。所以这里的所谓同步,就是生产数据的线程需要等待发送线程的应答(响应)结果。代码实现上,采用的是JDK1.5增加的JUC并发编程的Future接口的get方法实现。

image-20240428163206159
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class KafkaProducerCallbackTest {
public static void main(String[] args) throws Exception {
// 配置连接信息和序列化器
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(configs);
for (int i = 0; i < 10; i++) {
// 创建消息对象
ProducerRecord<String, String> record = new ProducerRecord<>("test", "key" + i, "value" + i);
// 异步发送消息
Future<RecordMetadata> result = kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println(Thread.currentThread().getName() + "数据发送成功:" + metadata);
}
});
// 同步发送消息
result.get();
System.out.println(Thread.currentThread().getName() + "发送数据:" + record);
}
// 关闭生产者对象
kafkaProducer.close();
}
}

image-20240428163341069

2.4.4 消息分区

2.4.4.1 指定分区

Kafka中Topic是对数据逻辑上的分类,而Partition才是数据真正存储的物理位置。所以在生产数据时,如果只是指定Topic的名称,其实Kafka是不知道将数据发送到哪一个Broker节点的。我们可以在构建数据传递Topic参数的同时,也可以指定数据存储的分区编号。

image-20240428163458871
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 配置连接信息和序列化器
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(configs);
for (int i = 0; i < 10; i++) {
// 创建消息对象
ProducerRecord<String, String> record = new ProducerRecord<>("test", 0, "key" + i, "value" + i);
// 发送消息
kafkaProducer.send(record);
}
// 关闭生产者对象
kafkaProducer.close();
2.4.4.2 未指定分区

如果不指定分区,Kafka会根据集群元数据中的主题分区来通过算法来计算分区编号并设定:

image-20240428164256414

  1. 如果指定了分区,直接使用
  2. 如果指定了自己的分区器,通过分区器计算分区编号,如果有效,直接使用
  3. 如果指定了数据Key,且使用Key选择分区的场合,采用murmur2非加密散列算法(类似于hash)计算数据Key序列化后的值的散列值,然后对主题分区数量模运算取余,最后的结果就是分区编号
    image-20240428164332126
  4. 如果未指定数据Key,或不使用Key选择分区,那么Kafka会采用优化后的粘性分区策略进行分区选择:
    1. 没有分区数据负载状态信息时,会从分区列表中随机选择一个分区
      image-20240428164634056
    2. 存在分区数据负载状态信息时,根据分区数据负载状态,通过随机数获取一个权重值
      image-20240428164808894
    3. 根据这个权重值在队列分区负载状态中进行二分查找法,查找权重值的索引值
      image-20240428164905064
    4. 将这个索引值加1就是当前设定的分区
      image-20240428164925785
  5. 增加数据后,会根据当前粘性分区中生产的数据量进行判断,是不是需要切换其他的分区。判断标准就是大于等于批次大小(16K)的2倍,或大于一个批次大小(16K)且需要切换。如果满足条件,下一条数据就会放置到其他分区。
2.4.4.3 分区器

在某些场合中,指定的数据我们是需要根据自身的业务逻辑发往指定的分区的。所以需要自己定义分区编号规则,而不是采用Kafka自动设置就显得尤其必要了。Kafka早期版本中提供了两个分区器,不过在当前kafka版本中已经不推荐使用了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class MyProducerPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 0;
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}

public class KafkaProducerPartitionerTest {
public static void main(String[] args) {
// 配置连接信息和序列化器
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 配置分区器
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyProducerPartitioner.class.getName());
// 创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(configs);
for (int i = 0; i < 10; i++) {
// 创建消息对象
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic2", "key" + i, "value" + i);
// 发送消息
kafkaProducer.send(record);
}
// 关闭生产者对象
kafkaProducer.close();
}
}

2.4.5 消息可靠性

对于生产者发送的数据,我们有的时候是不关心数据是否已经发送成功的,我们只要发送就可以了。在这种场景中,消息可能会因为某些故障或问题导致丢失,我们将这种情况称之为消息不可靠。虽然消息数据可能会丢失,但是在某些需要高吞吐,低可靠的系统场景中,这种方式也是可以接受的,甚至是必须的。但是在更多的场景中,我们是需要确定数据是否已经发送成功了且Kafka正确接收到数据的,也就是要保证数据不丢失,这就是所谓的消息可靠性保证。

而这个确定的过程一般是通过Kafka给我们返回的响应确认结果(Acknowledgement)来决定的,这里的响应确认结果我们也可以简称为ACK应答。根据场景,Kafka提供了3种应答处理,可以通过配置对象进行配置。

2.4.5.1 ACK=0

当生产数据时,生产者对象将数据通过网络客户端NetworkClient将数据发送到网络数据流缓冲区中的时候,Kafka就对当前的数据请求进行了响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。

image-20240428165810209

通过图形,明显可以看出,这种应答方式,数据已经走网络给Kafka发送了,但这其实并不能保证Kafka能正确地接收到数据,在传输过程中如果网络出现了问题,那么数据就丢失了。也就是说这种应答确认的方式,数据的可靠性是无法保证的。不过相反,因为无需等待Kafka服务节点的确认,通信效率倒是比较高的,也就是系统吞吐量会非常高

2.4.5.2 ACK=1

当生产数据时,Kafka Leader副本将数据接收到并写入到了日志文件后,就会对当前的数据请求进行响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。

image-20240428170030658

通过图形,可以看出,这种应答方式,数据已经存储到了分区Leader副本中,那么数据相对来讲就比较安全了,也就是可靠性比较高。之所以说相对来讲比较安全,就是因为现在只有一个节点存储了数据,而数据并没有来得及进行备份到Follower副本,那么一旦当前存储数据的Broker节点出现了故障,数据也依然会丢失。

2.4.5.3 ACK=all

当生产数据时,Kafka Leader副本和Follower副本都已经将数据接收到并写入到了日志文件后,再对当前的数据请求进行响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。

image-20240428170146483

通过图形,可以看出,这种应答方式,数据已经同时存储到了分区Leader副本和Follower副本中,那么数据已经非常安全了,可靠性也是最高的。此时,如果Leader副本出现了故障,那么Follower副本能够开始起作用,因为数据已经存储了,所以数据不会丢失。

不过这里需要注意,如果假设我们的分区有5个副本,编号为1,2,3,4,5:

image-20240428170249018

但是此时只有3个副本处于和Leader副本之间处于数据同步状态,那么此时分区就存在一个同步副本列表,我们称之为In-Syn-Replica,简称为ISR。此时,Kafka只要保证ISR中所有的4个副本接收到了数据,就可以对数据请求进行响应了,无需5个副本全部收到数据。

2.4.6 消息去重&有序

2.4.6.1 数据重试

由于网络或服务节点的故障,Kafka在传输数据时,可能会导致数据丢失,所以我们才会设置ACK应答机制,尽可能提高数据的可靠性。但其实在某些场景中,数据的丢失并不是真正地丢失,而是“虚假丢失”,比如咱们将ACK应答设置为1,也就是说一旦Leader副本将数据写入文件后,Kafka就可以对请求进行响应了。

image-20240428170712704

此时,如果假设由于网络故障的原因,Kafka并没有成功将ACK应答信息发送给Producer,那么此时对于Producer来讲,以为Kafka没有收到数据,所以就会一直等待响应,一旦超过某个时间阈值,就会发生超时错误,也就是说在Kafka Producer眼里,数据已经丢了。

image-20240428170754361

所以在这种情况下,Kafka Producer会尝试对超时的请求数据进行重试(retry)操作。通过重试操作尝试将数据再次发送给Kafka。

image-20240428170838266

如果此时发送成功,那么Kafka就又收到了数据,而这两条数据是一样的,也就是说,导致了数据的重复。

image-20240428170935538
2.4.6.2 数据乱序

数据重试(retry)功能除了可能会导致数据重复以外,还可能会导致数据乱序。假设我们需要将编号为1,2,3的三条连续数据发送给Kafka。每条数据会对应于一个连接请求:

image-20240428171148879

此时,如果第一个数据的请求出现了故障,而第二个数据和第三个数据的请求正常,那么Broker就收到了第二个数据和第三个数据,并进行了应答。

image-20240428171221461

为了保证数据的可靠性,此时,Kafka Producer会将第一条数据重新放回到缓冲区的第一个,进行重试操作:

image-20240428171257414

如果重试成功,Broker收到第一条数据,你会发现。数据的顺序已经被打乱了。

image-20240428171319526
2.4.6.3 数据幂等性

为了解决Kafka传输数据时,所产生的数据重复和乱序问题,Kafka引入了幂等性操作,所谓的幂等性,就是Producer同样的一条数据,无论向Kafka发送多少次,Kafka都只会存储一条。注意,这里的同样的一条数据,指的不是内容一致的数据,而是指的不断重试的数据

默认幂等性是不起作用的,所以如果想要使用幂等性操作,只需要在生产者对象的配置中开启幂等性配置即可:

配置项 配置值 说明
enable.idempotence true 开启幂等性
max.in.flight.requests.per.connection 小于等于5 每个连接的在途请求数,不能大于5,取值范围为[1,5]
acks all(-1) 确认应答,固定值,不能修改
retries >0 重试次数,推荐使用Int最大值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class KafkaProducerIdemTest {
public static void main(String[] args) {
// 配置连接信息和序列化器
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.ACKS_CONFIG, "-1");
// 开启幂等性需要满足以下条件:
// 1. 生产者的acks配置为all(-1)
// 2. 生产者的max.in.flight.requests.per.connections配置小于等于5
// 3. 生产者的retries配置大于0
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configs.put(ProducerConfig.RETRIES_CONFIG, 3);
configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
// 创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(configs);
for (int i = 0; i < 10; i++) {
// 创建消息对象
ProducerRecord<String, String> record = new ProducerRecord<>("test", "key" + i, "value" + i);
// 发送消息
kafkaProducer.send(record);
}
// 关闭生产者对象
kafkaProducer.close();
}
}

Kafka是如何实现数据的幂等性操作呢,我们这里简单说一下流程:

  1. 开启幂等性后,为了保证数据不会重复,那么就需要给每一个请求批次的数据增加唯一性标识,Kafka中,这个标识采用的是连续的序列号数字seqnum,但是不同的生产者Producer可能序列号是一样的,所以仅仅靠seqnum还无法唯一标记数据,所以还需要同时对生产者进行区分,所以Kafka采用申请生产者ID(producerid)的方式对生产者进行区分。这样,在发送数据前,我们就需要提前申请producerid以及序列号seqnum
  2. Broker中会给每一个分区记录生产者的生产状态:采用队列的方式缓存最近的5个批次数据。队列中的数据按照seqnum进行升序排列。这里的数字5是经过压力测试,均衡空间效率和时间效率所得到的值,所以为固定值,无法配置且不能修改。
    image-20240428172017690
  3. 如果Borker当前新的请求批次数据在缓存的5个旧的批次中存在相同的,那么说明有重复,当前批次数据不做任何处理。
    image-20240428172052001
  4. 如果Broker当前的请求批次数据在缓存中没有相同的,那么判断当前新的请求批次的序列号是否为缓存的最后一个批次的序列号加1,如果是,说明是连续的,顺序没乱。那么继续,如果不是,那么说明数据已经乱了,发生异常。
    image-20240428172133131
  5. Broker根据异常返回响应,通知Producer进行重试。Producer重试前,需要在缓冲区中将数据重新排序,保证正确的顺序后,再进行重试即可。
  6. 如果请求批次不重复且有序,那么更新缓冲区中的批次数据。将当前的批次放置在队列的结尾,将队列的第一个移除,保证队列中缓冲的数据最多5个。
    image-20240428172234387

从上面的流程可以看出,Kafka的幂等性是通过消耗时间和性能的方式保证了数据传输的有序和去重,在一些对数据敏感的业务中是十分重要的。但是通过原理,咱们也能明白,这种幂等性还是有缺陷的:

  • 幂等性的producer仅做到单分区上的幂等性,即单分区消息有序不重复,多分区无法保证幂等性。
  • 只能保持生产者单个会话的幂等性,无法实现跨会话的幂等性,也就是说如果一个producer挂掉再重启,那么重启前和重启后的producer对象会被当成两个独立的生产者,从而获取两个不同的独立的producerid,导致Broker端无法获取之前的状态信息,所以无法实现跨会话的幂等。要想解决这个问题,可以采用后续的事务功能。
2.4.6.4 数据事务

对于幂等性的缺陷,Kafka可以采用事务的方式解决跨会话的幂等性。基本的原理就是通过事务功能管理生产者ID,保证事务开启后,生产者对象总能获取一致的生产者ID。

为了实现事务,Kafka引入了事务协调器(TransactionCoodinator)负责事务的处理,所有的事务逻辑包括分派生产者ID等都是由TransactionCoodinator负责实施的。TransactionCoodinator会将事务状态持久化到该主题中。

事务基本的实现思路就是通过配置的事务ID,将生产者ID进行绑定,然后存储在Kafka专门管理事务的内部主题__transaction_state中,而内部主题的操作是由事务协调器(TransactionCoodinator)对象完成的,这个协调器对象有点类似于咱们数据发送时的那个副本Leader。其实这种设计是很巧妙的,因为Kafka将事务ID和生产者ID看成了消息数据,然后将数据发送到一个内部主题中。这样,使用事务处理的流程和咱们自己发送数据的流程是很像的。接下来,我们就把这两个流程简单做一个对比。

普通数据发送流程

image-20240428195526286

事务数据发送流程

image-20240428195615914

通过两张图大家可以看到,基本的事务操作和数据操作是很像的,不过要注意,我们这里只是简单对比了数据发送的过程,其实它们的区别还在于数据发送后的提交过程。普通的数据操作,只要数据写入了日志,那么对于消费者来讲。数据就可以读取到了,但是事务操作中,如果数据写入了日志,但是没有提交的话,其实数据默认情况下也是不能被消费者看到的。只有提交后才能看见数据。

事务提交流程

Kafka中的事务是分布式事务,所以采用的也是二阶段提交:

第一个阶段提交事务协调器会告诉生产者事务已经提交了,所以也称之预提交操作,事务协调器会修改事务为预提交状态:

image-20240428195924781

第二个阶段提交事务协调器会向分区Leader节点中发送数据标记,通知Broker事务已经提交,然后事务协调器会修改事务为完成提交状态:

image-20240428200052256

特殊情况下,事务已经提交成功,但还是读取不到数据,那是因为当前提交成功只是一阶段提交成功,事务协调器会继续向各个Partition发送marker信息,此操作会无限重试,直至成功。但是不同的Broker可能无法全部同时接收到marker信息,此时有的Broker上的数据还是无法访问,这也是正常的,因为kafka的事务不能保证强一致性,只能保证最终数据的一致性,无法保证中间的数据是一致的。不过对于常规的场景这里已经够用了,事务协调器会不遗余力的重试,直至成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class KafkaProducerTransactionTest {
public static void main(String[] args) {
// 配置连接信息和序列化器
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.ACKS_CONFIG, "-1");
// 开启幂等性需要满足以下条件:
// 1. 生产者的acks配置为all(-1)
// 2. 生产者的max.in.flight.requests.per.connections配置小于等于5
// 3. 生产者的retries配置大于0
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configs.put(ProducerConfig.RETRIES_CONFIG, 3);
configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
// 配置事务,注意事务只能保证单个分区的幂等性和跨会话的幂等性
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-id");
// 创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(configs);
// 初始化事务
kafkaProducer.initTransactions();
try {
// 开启事务
kafkaProducer.beginTransaction();
// 发送消息 这里只是发送10条消息,实际业务场景可能会有更多的消息
for (int i = 0; i < 10; i++) {
// 创建消息对象
ProducerRecord<String, String> record = new ProducerRecord<>("test", "key" + i, "value" + i);
// 发送消息
kafkaProducer.send(record);
}
// 提交事务
kafkaProducer.commitTransaction();
} catch (Exception e) {
// 回滚事务
kafkaProducer.abortTransaction();
e.printStackTrace();
} finally {
// 关闭生产者对象
kafkaProducer.close();
}
}
}
2.4.6.5 数据传输语义
传输语义 说明 例子
at most once 最多一次:不管是否能接收到,数据最多只传一次。这样数据可能会丢失, ACK=0
at least once 最少一次:消息不会丢失,如果接收不到,那么就继续发,所以会发送多次,直到收到为止,有可能出现数据重复 ACK=1
Exactly once 精准一次:消息只会一次,不会丢,也不会重复。 幂等+事务+ACK=-1

2.5 存储消息

数据已经由生产者Producer发送给Kafka集群,当Kafka接收到数据后,会将数据写入本地文件中。

image-20240428200808205

2.5.1 存储组件

  • KafkaApis : Kafka应用接口组件,当Kafka Producer向Kafka Broker发送数据请求后,Kafka Broker接收请求,会使用Apis组件进行请求类型的判断,然后选择相应的方法进行处理。

  • ReplicaManager : 副本管理器组件,用于提供主题副本的相关功能,在数据的存储前进行ACK校验和事务检查,并提供数据请求的响应处理。

  • Partition : 分区对象,主要包含分区状态变换的监控,分区上下线的处理等功能,在数据存储是主要用于对分区副本数量的相关校验,并提供追加数据的功能。

  • UnifiedLog : 同一日志管理组件,用于管理数据日志文件的新增,删除等功能,并提供数据日志文件偏移量的相关处理。

  • LocalLog : 本地日志组件,管理整个分区副本的数据日志文件。假设当前主题分区中有3个日志文件,那么3个文件都会在组件中进行管理和操作。

  • LogSegment : 文件段组件,对应具体的某一个数据日志文件,假设当前主题分区中有3个日志文件,那么3个文件每一个都会对应一个LogSegment组件,并打开文件的数据管道FileChannel。数据存储时,就是采用组件中的FileChannel实现日志数据的追加。

  • LogConfig: 日志配置对象,常用的数据存储配置。

2.5.2 数据存储

Kafka Broker节点从获取到生产者的数据请求到数据存储到文件的过程相对比较简单,只是中间会进行一些基本的数据检查和校验。所以接下来我们就将数据存储的基本流程介绍一下:

2.5.2.1 ACKS校验

Producer将数据发送给Kafka Broker时,会告知Broker当前生产者的数据生产场景,从而要求Kafka对数据请求进行应答响应确认数据的接收情况,Producer获取应答后可以进行后续的处理。

  • ACKS=0: Producer端将数据发送到网络输出流缓冲区中,此时Kafka就会进行响应。在这个场景中,数据的应答是非常快的,但是因为仅仅将数据发送到网络输出流缓冲区中,所以是无法保证Kafka Broker节点能够接收到消息,假设此时网络出现抖动不稳定导致数据丢失,而由于Kafka已经做出了确认收到的应答,所以此时Producer端就不会再次发送数据,而导致数据真正地丢失了。所以此种场景,数据的发送是不可靠的。
  • ACKS=1:Producer端将数据发送到Broker中,并保存到当前节点的数据日志文件中,Kafka就会进行确认收到数据的响应。因为数据已经保存到了文件中,也就是进行了持久化,那么相对于ACKS=0,数据就更加可靠。但是也要注意,因为Kafka是分布式的,所以集群的运行和管理是非常复杂的,难免当前Broker节点出现问题而宕掉,那么此时,消费者就消费不到我们存储的数据了,此时,数据我们还是会认为丢失了。
  • ACKS=-1:Kafka在管理分区时,会了数据的可靠性和更高的吞吐量,提供了多个副本,而多个副本之间,会选出一个副本作为数据的读写副本,称之为Leader副本,而其他副本称之Follower副本。普通场景中,所有的这些节点都是需要保存数据的。而Kafka会优先将Leader副本的数据进行保存,保存成功后,再由Follower副本向Leader副本拉取数据,进行数据同步。一旦所有的这些副本数据同步完毕后,Kafka再对Producer进行收到数据的确认。此时ACKS应答就是-1(all)。明显此种场景,多个副本本地文件都保存了数据,那么数据就更加可靠,但是相对,应答时间更长,导致Kafka吞吐量降低。

基于上面的三种生产数据的场景,在存储数据前,需要校验生产者需要的应答场景是否合法有效。

2.5.2.2 内部主题校验

Producer向Kafka Broker发送数据时,是必须指定主题Topic的,但是这个主题的名称不能是Kafka的内部主题名称。Kafka为了管理的需要,创建了2个内部主题,一个是用于事务处理的transaction_state内部主题,还有一个是用于处理消费者偏移量的consumer_offsets内部主题。生产者是无法对这两个主题生产数据的,所以在存储数据之前,需要对主题名称进行校验有效性校验。

2.5.2.3 ACKS应答及副本数量关系校验

Kafka为了数据可靠性更高一些,需要分区的所有副本都能够存储数据,但是分布式环境中难免会出现某个副本节点出现故障,暂时不能同步数据。在Kafka中,能够进行数据同步的所有副本,我们称之为In-Sync-Replicas,简称ISR列表。

当生产者Producer要求的数据ACKS应答为-1的时候,那么就必须保证能够同步数据的所有副本能够将数据保存成功后,再进行数据的确认应答。但是一种特殊情况就是,如果当前ISR列表中只有一个Broker存在,那么此时只要这一个Broker数据保存成功了,那么就产生确认应答了,数据依然是不可靠的,那么就失去了设置ACK=all的意义了,所以此时还需要对ISR列表中的副本数量进行约束,至少不能少于2个。这个数量是可以通过配置文件配置的。参数名为:min.insync.replicas。默认值为1(不推荐),所以存储数据前,也需要对ACK应答和最小分区副本数量的关系进行校验。

2.5.2.4 日志文件滚动判断

数据存储到文件中,如果数据文件太大,对于查询性能是会有很大影响的,所以副本数据文件并不是一个完整的大的数据文件,而是根据某些条件分成很多的小文件,每个小文件我们称之为文件段。其中的一个条件就是文件大小,参数名为:log.segment.bytes。默认值为1G。如果当前日志段剩余容量可能无法容纳新消息集合,因此有必要创建一个新的日志段来保存待写入的所有消息。此时日志文件就需要滚动生产新的。

除了文件大小外,还有时间间隔,如果文件段第一批数据有时间戳,那么当前批次数据的时间戳和第一批数据的时间戳间隔大于滚动阈值,那么日志文件也会滚动生产新的。如果文件段第一批数据没有时间戳,那么就用当前时间戳和文件创建时间戳进行比对,如果大于滚动阈值,那么日志文件也会滚动生产新的。这个阈值参数名为:log.roll.hours,默认为7天。如果时间到达,但是文件不满1G,依然会滚动生产新的数据文件。

如果索引文件或时间索引文件满了,或者索引文件无法存放当前索引数据了,那么日志文件也会滚动生产新的。

基于以上的原则,需要在保存数据前进行判断。

2.5.2.5 请求数据重复性校验

因为Kafka允许生产者进行数据重试操作,所以因为一些特殊的情况,就会导致数据请求被Kafka重复获取导致数据重复,所以为了数据的幂等性操作,需要在Broker端对数据进行重复性校验。这里的重复性校验只能对同一个主题分区的5个在途请求中数据进行校验,所以需要在生产者端进行相关配置。

2.5.2.6 请求数据序列号校验

因为Kafka允许生产者进行数据重试操作,所以因为一些特殊的情况,就会导致数据请求被Kafka重复获取导致数据顺序发生改变从而引起数据乱序。为了防止数据乱序,需要在Broker端对数据的序列号进行连续性(插入数据序列号和Broker分区缓冲的最后一个数据的序列号差值为1)校验。

2.5.2.7 数据存储

将数据通过LogSegmentFileChannel对象。将数据写入日志文件,写入完成后,更新当前日志文件的数据偏移量。

2.5.3 存储文件格式

我们已经将数据存储到了日志文件中,当然除了日志文件还有其他的一些文件,所以接下来我们就了解一下这些文件:

2.5.3.1 数据日志文件

Kafka系统早期设计的目的就是日志数据的采集和传输,所以数据是使用log文件进行保存的。我们所说的数据文件就是以.log作为扩展名的日志文件。文件名长度为20位长度的数字字符串,数字含义为当前日志文件的第一批数据的基础偏移量,也就是文件中保存的第一条数据偏移量。字符串数字位数不够的,前面补0。

image-20240428204630492

我们的常规数据主要分为两部分:批次头 + 数据体

image-20240428205810058

image-20240428205846182

如果我们发送的数据是一条为(key1,value1)的数据,那么Kafka当前会向日志文件增加的数据大小为:

1
2
3
4
# 追加数据字节计算
批次头 = 61
数据体 = 1 + 1 + 1 + 4 + 1 + 6 + 1 + 1 + 1 = 17
总的字节大小为61 + 17 = 78

image-20240428205139613

如果我们发送的数据是两条为(key1,value1),(key2,value2)的数据,那么Kafka当前会向日志文件增加的数据大小为:

image-20240428210529334

image-20240428210615311

2.5.3.2 数据索引文件

Kafka的基础设置中,数据日志文件到达1G才会滚动生产新的文件。那么从1G文件中想要快速获取我们想要的数据,效率还是比较低的。通过前面的介绍,如果我们能知道数据在文件中的位置(position),那么定位数据就会快很多,问题在于我们如何才能在知道这个位置呢。

Kafka在存储数据时,都会保存数据的偏移量信息,而偏移量是从0开始计算的。简单理解就是数据的保存顺序。比如第一条保存的数据,那么偏移量就是0,第二条保存的数据偏移量就是1,但是这个偏移量只是告诉我们数据的保存顺序,却无法定位数据,不过需要注意的是,每条数据的大小是可以确定的(参考上一个小节的内容)。既然可以确定,那么数据存放在文件的位置起始也就是确定了,所以Kafka在保存数据时,其实是可以同时保存位置的,那么我们在访问数据时,只要通过偏移量其实就可以快速定位日志文件的数据了。

image-20240428211248196

不过这依然有问题,就是数据量太多了,对应的偏移量也太多了,并且主题分区的数据文件会有很多,那我们是如何知道数据在哪一个文件中呢?为了定位方便Kafka在提供日志文件保存数据的同时,还提供了用于数据定位的索引文件,索引文件中保存的就是逻辑偏移量和数据物理存储位置(偏移量)的对应关系。并且还记得吗,每个数据日志文件的名称就是当前文件中数据起始偏移量,所以通过偏移量就可以快速选取文件以及定位数据的位置从而快速找到数据。这种感觉就有点像Java的HashMap通过Key可以快速找到Value的感觉一样,如果不知道Key,那么从HashMap中获取Value是不是就特别慢。道理是一样的。

Kafka的数据索引文件都保存了什么呢?咱们来看一下:

image-20240428211602096

通过图片可以看到,索引文件中保存的就是逻辑偏移量和物理偏移量位置的关系。有了这个索引文件,那么我们根据数据的顺序获取数据就非常的方便和高效了,Kafka在查询定位时其实采用的就是二分查找法

不过,为什么Kafka的索引文件是不连续的呢,那是因为如果每条数据如果把偏移量定位都保存下来,数据量也不小,还有就是如果索引数据丢了几条,其实并不会太影响查询效率,因为Kafka底层实现时,采用的是虚拟内存映射技术mmap,将内存和index文件进行双向映射,操作内存数据就等同于操作文件,所以效率是非常高的,而且为了效率,Kafka默认情况下,4KB的日志数据才会记录一次索引,但是这个是可以进行配置修改的,参数为log.index.interval.bytes,默认值为4096。所以我们有的时候会将Kafka的不连续索引数据称之为稀疏索引

2.5.3.3 数据时间戳索引文件

某些场景中,我们不想根据顺序(偏移量)获取Kafka的数据,而是想根据时间戳来获取的数据。这个时候,可没有对应的偏移量来定位数据,那么查找的效率就非常低了,因此Kafka还提供了时间索引文件,咱们来看看它的内容是什么:

image-20240428212545858

通过图片,大家可以看到,这个时间索引文件起始就是将时间戳和偏移量对应起来了,那么此时通过时间戳就可以找到偏移量,再通过偏移量找到文件位置信息,再通过文件位置信息找到数据不就非常方便了吗。

2.5.4 数据刷写

在Linux系统中,当我们把数据写入文件系统之后,其实数据在操作系统的PageCache(页缓冲)里面,并没有刷到磁盘上。如果操作系统挂了,数据就丢失了。一方面,应用程序可以调用fsync这个系统调用来强制刷盘,另一方面,操作系统有后台线程定时刷盘。频繁调用fsync会影响性能,需要在性能和可靠性之间进行权衡。实际上,Kafka提供了参数进行数据的刷写:

  • log.flush.interval.messages :达到消息数量时,会将数据flush到日志文件中。
  • log.flush.interval.ms :间隔多少时间(ms),执行一次强制的flush操作。
  • flush.scheduler.interval.ms:所有日志刷新到磁盘的频率

log.flush.interval.messages和log.flush.interval.ms无论哪个达到,都会flush。官方不建议通过上述的三个参数来强制写盘,数据的可靠性应该通过replica来保证,而强制flush数据到磁盘会对整体性能产生影响

2.5.5 副本同步

Kafka中,分区的某个副本会被指定为Leader,负责响应客户端的读写请求。分区中的其他副本自动成为Follower,主动拉取(同步)Leader副本中的数据,写入自己本地日志,确保所有副本上的数据是一致的。

image-20240428213137350
2.5.5.1 启动数据同步线程

Kafka创建主题时,会根据副本分配策略向指定的Broker节点发出请求,将不同的副本节点设定为Leader或Follower。一旦某一个Broker节点设定为Follower节点,那么Follower节点会启动数据同步线程ReplicaFetcherThread,从Leader副本节点同步数据。线程运行后,会不断重复两个操作:截断(truncate)和抓取(fetch)。

  • 截断:为了保证分区副本的数据一致性,当分区存在Leader Epoch值时,会将副本的本地日志截断到Leader Epoch对应的最新位移处。如果分区不存在对应的Leader Epoch记录,那么依然使用原来的高水位机制,将日志调整到高水位值处。
  • 抓取:向Leader同步最新的数据。
2.5.5.2 生成数据同步请求

启动线程后,需要周期地向Leader节点发送FETCH请求,用于从Leader获取数据。等待Leader节点的响应的过程中,会阻塞当前同步数据线程。

2.5.5.3 处理数据响应

当Leader副本返回响应数据时,其中会包含多个分区数据,当前副本会遍历每一个分区,将分区数据写入数据文件中。

image-20240428214051211

2.5.5.4 更新数据偏移量

当Leader副本返回响应数据时,除了包含多个分区数据外,还包含了和偏移量相关的数据HW和LSO,副本需要根据场景对Leader返回的不同偏移量进行更新。

  • Offset:Kafka的每个分区的数据都是有序的,所谓的数据偏移量,指的就是Kafka在保存数据时,用于快速定位数据的标识,类似于Java中数组的索引,从0开始。Kafka的数据文件以及数据访问中包含了大量和偏移量的相关的操作。
  • LSO:起始偏移量(Log Start Offset),每个分区副本都有起始偏移量,用于表示副本数据的起始偏移位置,初始值为0。LSO一般情况下是无需更新的,但是如果数据过期,或用户手动删除数据时,Leader的Log Start Offset可能发生变化,Follower副本的日志需要和Leader保持严格的一致,因此,如果Leader的该值发生变化,Follower自然也要发生变化保持一致。
  • LEO:日志末端位移(Log End Offset),表示下一条待写入消息的offset,每个分区副本都会记录自己的LEO。对于Follower副本而言,它能读取到Leader副本LEO值以下的所有消息。
  • HW:高水位值(High Watermark),定义了消息可见性,标识了一个特定的消息偏移量(offset),消费者只能拉取到这个水位offset之前的消息,同时这个偏移量还可以帮助Kafka完成副本数据同步操作。

2.5.6 数据一致性

2.5.6.1 数据一致性表现

Kafka的设计目标是:高吞吐、高并发、高性能。为了做到以上三点,它必须设计成分布式的,多台机器可以同时提供读写,并且需要为数据的存储做冗余备份。

image-20240428214612711

图中的主题有3个分区,每个分区有3个副本,这样数据可以冗余存储,提高了数据的可用性。并且3个副本有两种角色,Leader和Follower,Follower副本会同步Leader副本的数据。一旦Leader副本挂了,Follower副本可以选举成为新的Leader副本,这样就提升了分区可用性,但是相对的,在提升了分区可用性的同时,也就牺牲了数据的一致性。

我们来看这样的一个场景:一个分区有3个副本,一个Leader和两个Follower。Leader副本作为数据的读写副本,所以生产者的数据都会发送给Leader副本,而两个Follower副本会周期性地同步Leader副本的数据,但是因为网络,资源等因素的制约,同步数据的过程是有一定延迟的,所以3个副本之间的数据可能是不同的。具体如下图所示:

image-20240428214755561

此时,假设Leader副本因为意外原因宕掉了,那么Kafka为了提高分区可用性,此时会选择2个Follower副本中的一个作为Leader对外提供数据服务。此时我们就会发现,对于消费者而言,之前Leader副本能访问的数据是D,但是重新选择Leader副本后,能访问的数据就变成了C,这样消费者就会认为数据丢失了,也就是所谓的数据不一致了。

image-20240428214843094

为了提升数据的一致性,Kafka引入了高水位(HW :High Watermark)机制,Kafka在不同的副本之间维护了一个水位线的机制(其实也是一个偏移量的概念),消费者只能读取到水位线以下的的数据。这就是所谓的木桶理论:木桶中容纳水的高度,只能是水桶中最短的那块木板的高度。这里将整个分区看成一个木桶,其中的数据看成水,而每一个副本就是木桶上的一块木板,那么这个分区(木桶)可以被消费者消费的数据(容纳的水)其实就是数据最少的那个副本的最后数据位置(木板高度)。

也就是说,消费者一开始在消费Leader的时候,虽然Leader副本中已经有a、b、c、d 4条数据,但是由于高水位线的限制,所以也只能消费到a、b这两条数据。

image-20240428214957062

这样即使Leader挂掉了,但是对于消费者来讲,消费到的数据其实还是一样的,因为它能看到的数据是一样的,也就是说,消费者不会认为数据不一致。

image-20240428215023891
2.5.6.2 HW在副本之间的传递

HW高水位线会随着Follower的数据同步操作,而不断上涨,也就是说,Follower同步的数据越多,那么水位线也就越高,那么消费者能访问的数据也就越多。接下来,我们就看一看,Follower在同步数据时HW的变化。

首先,初始状态下,Leader和Follower都没有数据,所以和偏移量相关的值都是初始值0,而由于Leader需要管理Follower,所以也包含着Follower的相关偏移量(LEO)数据。

image-20240428215219716

生产者向Leader发送两条数据,Leader收到数据后,会更新自身的偏移量信息。

1
2
Leader副本偏移量更新:
LEO=LEO+2=2
image-20240428215306657

接下来,Follower开始同步Leader的数据,同步数据时,会将自身的LEO值作为参数传递给Leader。此时,Leader会将数据传递给Follower,且同时Leader会根据所有副本的LEO值更新HW。

image-20240428215349451

1
2
Leader副本偏移量更新:
HW = Math.max[HW, min(LeaderLEO,F1-LEO,F2-LEO)]=0
image-20240428215433214

由于两个Follower的数据拉取速率不一致,所以Follower-1抓取了2条数据,而Follower-2抓取了1条数据。Follower再收到数据后,会将数据写入文件,并更新自身的偏移量信息。

1
2
3
4
5
6
Follower-1副本偏移量更新:
LEO=LEO+2=2
HW = Math.min[LeaderHW, LEO]=0
Follower-2副本偏移量更新:
LEO=LEO+1=1
HW = Math.min[LeaderHW, LEO]=0
image-20240428215607287

接下来Leader收到了生产者的数据C,那么此时会根据相同的方式更新自身的偏移量信息:

1
2
Leader副本偏移量更新:
LEO=LEO+1=3
image-20240428215644699

Follower接着向Leader发送Fetch请求,同样会将最新的LEO作为参数传递给Leader。Leader收到请求后,会更新自身的偏移量信息。

1
2
Leader副本偏移量更新:
HW = Math.max[HW, min(LeaderLEO,F1-LEO,F2-LEO)]=1
image-20240428215743927

此时,Leader会将数据发送给Follower,同时也会将HW一起发送。

image-20240428215809592

Follower收到数据后,会将数据写入文件,并更新自身偏移量信息:

1
2
3
4
5
6
Follower-1副本偏移量更新:
LEO=LEO+1=3
HW = Math.min[LeaderHW, LEO]=1
Follower-2副本偏移量更新:
LEO=LEO+1=2
HW = Math.min[LeaderHW, LEO]=1
image-20240428215902766

因为Follower会不断重复Fetch数据的过程,所以前面的操作会不断地重复。最终,Follower副本和Leader副本的数据和偏移量是保持一致的。

image-20240428215925337

上面演示了副本列表ISR中Follower副本和Leader副本之间HW偏移量的变化过程,但特殊情况是例外的。比如当前副本列表ISR中,只剩下了Leader一个副本的场合下,是不需要等待其他副本的,直接推高HW即可。

2.5.6.3 ISR(In-Sync-Replicas)伸缩

Kafka的分区副本中只有Leader副本具有数据写入的功能,而Follower副本需要不断向Leader发出申请,进行数据的同步。这里所有同步的副本会形成一个列表,我们称之为同步副本列表也可以简称ISR,除了ISR以外,还有已分配的副本列表(Assigned Replicas),简称AR。这里的AR其实不仅仅包含ISR,还包含了没有同步的副本列表(Out-of-Sync Replicas),简称OSR。

生产者Producer生产数据时,ACKS应答机制如果设置为all(-1),那此时就需要保证同步副本列表ISR中的所有副本全部接收完毕后,Kafka才会进行确认应答。数据存储时,只有ISR中的所有副本LEO数据都更新了,才有可能推高HW偏移量的值。这就可以看出,ISR在Kafka集群的管理中是非常重要的。

在Broker节点中,有一个副本管理器组件(ReplicaManager),除了读写副本、管理分区和副本的功能之外,还有一个重要的功能,那就是管理ISR。这里的管理主要体现在两个方面:

  • 周期性地查看ISR中的副本集合是否需要收缩。这里的收缩是指把ISR副本集合中那些与Leader差距过大的副本移除的过程。
    image-20240428220252472

    相对的,有收缩就会有扩大,在Follower抓取数据时,判断副本状态,满足扩大ISR条件后,就可以提交分区变更请求。完成ISR列表的变更。

  • 向集群Broker传播ISR的变更。ISR发生变化(包含Shrink和Expand)都会执行传播逻辑。ReplicaManager每间隔2500毫秒就会根据条件,将ISR变化的结果传递给集群的其他Broker。
    image-20240428220408804

2.6 消费消息

2.6.1 消费消息的基本步骤

(一)创建Map类型的配置对象,根据场景增加相应的配置属性

  • bootstrap.servers
    • 用于建立与Kafka集群的初始连接的主机/端口对列表
    • 格式:host1:port1,host2:port2,…
  • key.serializer
    • 实现org.apache.kafka.common.serialization.Deserializer接口的key的反序列化器的全限定类名
  • value.serializer
    • 实现org.apache.kafka.common.serialization.Deserializer接口的value的反序列化器的全限定类名
  • group.id
    • 标识该消费者所属的消费者组的唯一字符串
  • auto.offset.reset
    • 当Kafka中没有初始消费者偏移量或者当前消费者偏移量在Broker上不再存在时,该怎么办:
      • earliest:自动将消费者偏移量重置为最早偏移量
      • latest:自动将消费者偏移量重置为最新偏移量
      • none:如果没有找到消费者组的先前偏移量,则向消费者抛出异常
  • group.instance.id
    • 消费者实例ID,如果指定,那么在消费者组中使用此ID作为memberId前缀
  • partition.assignment.strategy
    • 默认分配器是[RangeAssignor, CooperativeStickyAssignor],默认情况下将使用RangeAssignor,但允许升级到 CooperativeStickyAssignor,只需一次ReBlance即可从列表中删除RangeAssignor
  • enable.auto.commit
    • 如果为true,消费者的偏移量将在后台定期提交
  • auto.commit.interval.ms
    • 如果enable.auto.commit设置为true,则消费者偏移量自动提交到Kafka的频率(以毫秒为单位),默认5000ms

(二)创建消费者对象

根据配置创建消费者对象KafkaConsumer,向Kafka订阅(subscribe)主题消息,并向Kafka发送请求(poll)获取数据。

(三)获取数据

Kafka会根据消费者发送的参数,返回数据对象ConsumerRecord。返回的数据对象中包括指定的数据。

image-20240428222140144

2.6.2 消费消息的基本代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class KafkaConsumerTest {
public static void main(String[] args) {
// 配置连接信息和反序列化器
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(configs);
// 订阅主题
kafkaConsumer.subscribe(Collections.singletonList("test"));
while (true) {
// 拉取消息
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
}
}
}

2.6.3 消费消息的基本原理

2.6.3.1 消费者组

Kafka的主题如果就一个分区的话,那么在硬件配置相同的情况下,消费者Consumer消费主题数据的方式没有什么太大的差别。

image-20240428222516419

不过,Kafka为了能够构建高吞吐,高可靠性,高并发的分布式消息传输系统,它的主题是允许多个分区的,那么就会发现不同的消费数据的方式区别还是很大的。

  • 如果数据由Kafka进行推送(push),那么多个分区的数据同时推送给消费者进行处理,明显一个消费者的消费能力是有限的,那么消费者无法快速处理数据,就会导致数据的积压,从而导致网络,存储等资源造成极大的压力,影响吞吐量和数据传输效率。
    image-20240428222620257

  • 如果Kafka的分区数据在内部可以存储的时间更长一些,再由消费者根据自己的消费能力向Kafka拉取(poll)数据,那么整个数据处理的通道就会更顺畅一些。Kafka的Consumer就采用的这种拉取数据的方式。

    image-20240428222716360

消费者可以根据自身的消费能力主动拉取Kafka的数据,但是毕竟自身的消费能力有限,如果主题分区的数据过多,那么消费的时间就会很长。对于Kafka来讲,数据就需要长时间的进行存储,那么对Kafka集群资源的压力就非常大。如果希望提高消费者的消费能力,并且减少Kafka集群的存储资源压力。所以有必要对消费者进行横向伸缩,从而提高消息消费速率。

image-20240428222807751

不过这么做有一个问题,就是每一个消费者是独立,那么一个消费者就不能消费主题中的全部数据,简单来讲,就是对于某一个消费者个体来讲,主题中的部分数据是没有消费到的,也就会认为数据丢了,这个该如何解决呢?那如果我们将这多个消费者当成一个整体,是不是就可以了呢?这就是所谓的消费者组Consumer Group。在Kafka中,每个消费者都对应一个消费组,消费者可以是一个线程,一个进程,一个服务实例,如果Kafka想要消费消息,那么需要指定消费哪个Topic的消息以及自己的消费组id(groupId)。

image-20240428223038793
2.6.3.2 组调度器GroupCoordinator

消费者想要拉取数据,首先必须要加入到一个组中,成为消费组中的一员,同样道理,如果消费者出现了问题,也应该从消费者组中剥离。而这种加入组和退出组的处理,都应该由专门的管理组件进行处理,这个组件在Kafka中,我们称之为消费者组调度器(Group Coordinator)

Group Coordinator是Broker上的一个组件,用于管理和调度消费者组的成员、状态、分区分配、偏移量等信息。每个Broker都有一个Group Coordinator对象,负责管理多个消费者组,但每个消费者组只有一个Group Coordinator。

image-20240428223648494
2.6.3.3 消费者分配策略Assignor

消费者想要拉取主题分区的数据,首先必须要加入到一个组中。

image-20240428223731697

但是一个组中有多个消费者的话,那么每一个消费者该如何消费呢,是不是像图中一样的消费策略呢?如果是的话,那假设消费者组中只有2个消费者或有4个消费者,和分区的数量不匹配,怎么办?所以这里,我们需要给大家介绍一下,Kafka中基本的消费者组中的消费者和分区之间的分配规则:

  • 同一个消费者组的消费者都订阅同一个主题,所以消费者组中的多个消费者可以共同消费一个主题中的所有数据。
  • 为了避免数据被重复消费,所以主题一个分区的数据只能被组中的一个消费者消费,也就是说不能两个消费者同时消费一个分区的数据。但是反过来,一个消费者是可以消费多个分区数据的。
    image-20240428223907358
  • 消费者组中的消费者数量最好不要超出主题分区的数据,就会导致多出的消费者是无法消费数据的,造成了资源的浪费。
    image-20240428223929390

消费者中的每个消费者到底消费哪一个主题分区,这个分配策略其实是由消费者的Leader决定的,这个Leader我们称之为群主。群主是多个消费者中,第一个加入组中的消费者,其他消费者我们称之为Follower,称呼上有点类似与分区的Leader和Follower。

当消费者加入群组的时候,会发送一个JoinGroup请求。群主负责给每一个消费者分配分区。每个消费者只知道自己的分配信息,只有群主知道群组内所有消费者的分配信息。

指定分配策略的基本流程

  1. 第一个消费者设定group.id为test,向当前负载最小的节点发送FIND_COORDINATOR请求查找消费组调度器
    image-20240428224116726

  2. 找到消费调度器后,消费者向调度器节点发出JOIN_GROUP请求,加入消费者组

    image-20240428224220155

  3. 当前消费者当选为群主后,根据消费者配置中分配策略设计分区分配方案,并将分配好的方案告知调度器
    image-20240428224313324

  4. 此时第二个消费者设定group.id为test,申请加入消费者组
    image-20240428224336839

  5. 加入成功后,Kafka将消费者组状态切换到准备rebalance关闭和消费者的所有连接,等待它们重新加入。客户端重新申请加入,Kafka从消费者组中挑选第一个作为Leader,其它的作为Follower。(步骤和之前相同,我们假设还是之前的消费者为Leader
    image-20240428224519668

  6. Leader会按照分配策略对分区进行重分配,并将方案发送给调度器,由调度器通知所有的成员新的分配方案。消费者组成员会按照新的方案重新消费数据
    image-20240428224600552

Kafka提供的分区分配策略常用的有4个:

  • RoundRobinAssignor(轮询分配策略)

每个消费者组中的消费者都会含有一个自动生产的UUID作为memberid。

image-20240428224701418

轮询策略中会将每个消费者按照memberid进行排序,所有member消费的主题分区根据主题名称进行排序。

image-20240428224804396

将主题分区轮询分配给对应的订阅用户,注意未订阅当前轮询主题的消费者会跳过。

image-20240428224834085

image-20240428224840167

从图中可以看出,轮询分配策略是存在缺点的,并不是那么的均衡,如果test1-2分区能够分配给消费者ccc是不是就完美了。

  • RangeAssignor(范围分配策略)

按照每个Topic的Partition数计算出每个消费者应该分配的分区数量,然后分配,分配的原则就是一个主题的分区尽可能的平均分,如果不能平均分,那就按顺序向前补齐即可。

1
2
3
4
5
6
#所谓按顺序向前补齐就是:
假设【1,2,3,4,5】5个分区分给2个消费者:
5 / 2 = 2, 5 % 2 = 1 => 剩余的一个补在第一个中[2+1][2] => 结果为[1,2,3][4,5]

假设【1,2,3,4,5】5个分区分到3个消费者:
5 / 3 = 1, 5 % 3 = 2 => 剩余的两个补在第一个和第二个中[1+1][1+1][1] => 结果为[1,2][3,4][5]
image-20240428225126226

缺点:Range分配策略针对单个Topic的情况下显得比较均衡,但是假如Topic多的话, member排序靠前的可能会比member排序靠后的负载多很多

image-20240428225225932

还有就是如果新增或移除消费者成员,那么会导致每个消费者都需要去建立新的分区节点的连接,更新本地的分区缓存,效率比较低。

image-20240428225430546
  • StickyAssignor(粘性分区)

在第一次分配后,每个消费者组成员都保留分配给自己的分区信息。如果有消费者加入或退出,那么在进行分区再分配时(一般情况下,消费者退出45s后,才会进行再分配,因为需要考虑可能又恢复的情况),尽可能保证消费者原有的分区不变,重新对加入或退出消费者的分区进行分配。

image-20240428225550064 image-20240428225609098

从图中可以看出,粘性分区分配策略分配的会更加均匀和高效一些。

  • CooperativeStickyAssignor

前面的三种分配策略再进行重分配时使用的是EAGER协议,会让当前的所有消费者放弃当前分区,关闭连接,资源清理,重新加入组和等待分配策略。明显效率是比较低的,所以从Kafka2.4版本开始,在粘性分配策略的基础上,优化了重分配的过程,使用的是COOPERATIVE协议,特点就是在整个再分配的过程中从图中可以看出,粘性分区分配策略分配的会更加均匀和高效一些,COOPERATIVE协议将一次全局重平衡,改成每次小规模重平衡,直至最终收敛平衡的过程。

Kafka消费者默认的分区分配就是[RangeAssignor,CooperativeStickyAssignor]。

2.6.3.4 消费者偏移量

消费者偏移量是消费者消费数据的一个非常重要的属性。默认情况下,消费者如果不指定消费主题数据的偏移量,那么消费者启动消费时,无论当前主题之前存储了多少历史数据,消费者只能从连接成功后当前主题最新的数据偏移位置读取,而无法读取之前的任何数据,如果想要获取之前的数据,就需要设定配置参数或指定数据偏移量。

在消费者的配置中,我们可以增加偏移量相关参数auto.offset.reset,用于从最开始获取主题数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class KafkaConsumerOffsetTest {
public static void main(String[] args) {
// 配置连接信息和反序列化器
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
// 设置重启后消费者偏移量,earliest表示从最早的位置开始消费,如果开启了自动提交偏移量则以上次自动提交的偏移量为准
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(configs);
// 订阅主题
kafkaConsumer.subscribe(Collections.singletonList("test"));
while (true) {
// 拉取消息
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
}
}
}

image-20240429142907777

除了从最开始的偏移量或最后的偏移量读取数据以外,Kafka还支持从指定的偏移量的位置开始消费数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class KafkaConsumerSpecificOffsetTest {
public static void main(String[] args) {
// 配置连接信息和反序列化器
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(configs);
// 订阅主题
kafkaConsumer.subscribe(Collections.singletonList("test"));
// 获取集群元数据信息
kafkaConsumer.poll(Duration.ofSeconds(3));
Set<TopicPartition> assignment = kafkaConsumer.assignment();
// 配置需要消费的主题及偏移量
for (TopicPartition partition : assignment) {
// 设置偏移量为0,从头开始消费
kafkaConsumer.seek(partition, 0);
}
while (true) {
// 拉取消息
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
}
}
}

生产环境中,消费者可能因为某些原因或故障重新启动消费,那么如果不知道之前消费数据的位置,重启后再消费,就可能重复消费(earliest)漏消费(latest)。所以Kafka提供了保存消费者偏移量的功能,而这个功能需要由消费者进行提交操作。这样消费者重启后就可以根据之前提交的偏移量进行消费了。

注意,一旦消费者提交了偏移量,那么Kafka会优先使用提交的偏移量进行消费。此时,auto.offset.reset参数是不起作用的。

  • 自动提交

所谓的自动提交就是消费者消费完数据后,无需告知Kafka当前消费数据的偏移量,而是由消费者客户端API周期性地将消费的偏移量提交到Kafka中。这个周期默认为5000ms,可以通过配置进行修改。

image-20240429193535365

  • 手动提交
    基于时间周期的消费者偏移量提交是我们无法控制的,一旦参数设置的不合理,或单位时间内数据量消费的很多,却没有来及的自动提交,那么数据就会重复消费。所以Kafka也支持消费偏移量的手动提交,也就是说当消费者消费完数据后,自行通过API进行提交。不过为了考虑效率和安全,Kafka同时提供了异步提交和同步提交两种方式供我们选择。注意:需要禁用自动提交auto.offset.reset=false,才能开启手动提交。
    • 异步提交
      向Kafka发送偏移量提交请求后,就可以直接消费下一批数据,因为无需等待Kafka的提交确认,所以无法知道当前的偏移量一定提交成功,所以安全性比较低,但相对消费性能会提高。
      image-20240429194300516
    • 同步提交
      必须等待Kafka完成偏移量提交请求的响应后,才可以消费下一批数据,一旦提交失败,会进行重试处理,尽可能保证偏移量提交成功,但是依然可能因为一些情况导致提交请求失败。此种方式消费效率比较低,但是安全性高。
      image-20240429194447160
2.6.3.5 消费者事务

无论偏移量使用自动提交还是,手动提交,特殊场景中数据都有可能会出现重复消费。

image-20240429200434078

如果提前提交偏移量,再处理业务,又可能出现数据丢失的情况。

image-20240429200543721

对于单独的Consumer来讲,事务保证会比较弱,尤其是无法保证提交的信息被精确消费,主要原因就是消费者可以通过偏移量访问信息,而不同的数据文件生命周期不同,同一事务的信息可能会因为重启导致被删除的情况。一般情况下,想要完成Kafka消费者端的事务处理,需要将数据消费过程和偏移量提交过程进行原子性绑定,也就是说数据处理完了,必须要保证偏移量正确提交,才可以做下一步的操作,如果偏移量提交失败,那么数据就恢复成处理之前的效果。

对于生产者事务而言,消费者消费的数据也会受到限制。默认情况下,消费者只能消费到生产者提交的数据,也就是未提交完成的数据,消费者是看不到的。如果想要消费到未提交的数据,需要更高消费事务隔离级别。

image-20240429202953305

2.6.3.6 偏移量保存

由于消费者在消费消息的时候可能会由于各种原因而断开消费,当重新启动消费者时我们需要让它接着上次消费的位置offset继续消费,因此消费者需要实时的记录自己消费的位置。

0.90版本之前,这个信息是记录在ZooKeeper内的,在0.90之后的版本,offset保存在__consumer_offsets这个Topic内。每个consumer会定期将自己消费分区的offset提交给kafka内部Topic:__consumer_offsets,提交过去的时候,key是consumerGroupId+Topic+分区号,value就是当前offset的值,Kafka会定期清理Topic里的消息,最后就保留最新的那条数据。

因为__consumer_offsets可能会接收高并发的请求,Kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置),均匀分配到Kafka集群的多个Broker中。Kafka采用hash(consumerGroupId) % __consumer_offsets主题的分区数来计算我们的偏移量提交到哪一个分区。因为偏移量也是保存到主题中的,所以保存的过程和生产者生产数据的过程基本相同。

2.6.3.7 消费数据

消费者消费数据时,一般情况下,只是设定了订阅的主题名称,那是如何消费到数据的呢。我们这里说一下服务端拉取数据的基本流程。

image-20240429203635918
  1. 服务端获取到用户拉取数据的请求:Kafka消费客户端会向Broker发送拉取数据的请求FetchRequest,服务端Broker获取到请求后根据请求标记FETCH交给应用处理接口KafkaApis进行处理。
  2. 通过副本管理器拉取数据:副本管理器需要确定当前拉取数据的分区,然后进行数据的读取操作。
  3. 判定首选副本:2.4版本前,数据读写的分区都是Leader分区,从2.4版本后,Kafka支持Follower副本进行读取。主要原因就是跨机房或者说跨数据中心的场景,为了节约流量资源,可以从当前机房或数据中心的副本中获取数据。这个副本称之首选副本。
  4. 拉取分区数据:Kafka的底层读取数据是采用日志段LogSegment对象进行操作的。
  5. 零拷贝:为了提高数据读取效率,Kafka的底层采用NIO提供的FileChannel零拷贝技术,直接从操作系统内核中进行数据传输,提高数据拉取的效率。