You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1240 lines
136 KiB
Markdown

# Kafka
## 0. 目录
- Kafka Kafka基本概念及配置
Kafka 客户端实践及原理剖析
Kafka 内核及源码
Kafka 管理和监控
Kafka 流处理
## 1. Kafka基本概念及配置
- 官方QuickStart: https://kafka.apache.org/quickstart
- 官方文档: https://kafka.apache.org/documentation/
- 最新版本 3.2.0 【截止2022-7-20】
- 学习版本 kafka_2.11-2.4.0【前面是scala后面是kafka】 zookeeper-3.5.7【截止2022-7-20】
### 1.1 概念
- 概述
- Kafka 是分布式的流处理平台
- Kafka 是基于Zookeeper的分布式消息系统
- Kafka具有高吞吐率、高性能、实时、高可靠等特点
- Kafka 的组成概念
- Topic: 一个虚拟的概念, 由1到多个Partitions组成
- Partition: 实际消息存储单位
- Producer: 消息生产者
- Consumer: 消息消费者
-
### 1.2 安装Zookeeper(快捷)
```shell
# 修改配置文件
cp zoo_sample.cfg zoo.cfg
# 需要修改参数
dataDir=指定一个存储配置文件的目录
# 启动
./zkServer.sh start
# 连接测试
./zkCli.sh
```
### 1.3 安装Kafka
```shell
# 修改基本的配置文件
vim server.properties
# 需要修改的配置项
listeners=PLAINTEXT://[你的ip地址]:9092
advertised.listeners=PLAINTEXT://[你的ip地址]:9092
log.dirs=你的kafka的日志目录
zookeeper.connect=[你的ip地址]:2181
```
### 1.4 Kafka 的线上方案
- 操作系统
- 在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的 快速数据传输特性。
- Windows 平台上部署 Kafka 只适合于个人测试或用于功能验证, 千万不要应用于生产环境。
- 磁盘
- **普通的机械磁盘** 即可, Kafka 大量使用磁盘不假,可它使用的方式多是顺序读写操作,一定程度上规避了机械磁盘 最大的劣势,即随机读写操作慢。
- 追求性价比的公司可以**不搭建 RAID**,使用**普通磁盘组成存储空间即可**。 [Kafka 1.1开始正式支持JBOD了。再说Kafka本身在软件层面也提供了冗余的机制来对 抗磁盘损坏。]
- 磁盘容量
- 在规划磁盘容量时你需要考虑下面这几个元素:
- 新增消息数
- 消息留存时间
- 平均消息大小
- 备份数
- 是否启用压缩
- 例子:
- 假设你所在公司有个业务每天需要向 Kafka 集群发送 **1 亿条消息**,每条消息**保存两份**以防止数据丢失,另外消息默认**保存两周** 时间。现在假设消息的**平均大小是 1KB**?
- 每天 1 亿条 1KB 大小的消息,保存两份且留存两周的时间,那么总的空 间大小就等于 1 亿 * 1KB * 2 / 1000 / 1000 = 200GB。一般情况下 Kafka 集群除了消息 数据还有其他类型的数据,
- 比如索引数据等,故我们再为这些数据预留出 10% 的磁盘空 间,因此总的存储容量就是 220GB。既然要保存两周那么整体容量即为 220GB * 14 大约 3TB 左右。Kafka 支持数据的压缩,
假设压缩比是 0.75,那么最后你需要规划的存储 空间就是 0.75 * 3 = 2.25TB。
- 带宽
- 对于 Kafka 这种通过网络大量进行数据传输的框架而言,**带宽特别容易成为瓶颈**。事实 上,在我接触的真实案例当中,带宽资源不足导致 Kafka 出现性能问题的比例至少占 60% 以上。
如果你的环境中还涉及跨机房传输,那么情况可能就更糟了。
- 带宽也主要有 两种:1Gbps 的千兆网络和 10Gbps 的万兆网络,特别是千兆网络应该是一般公司网络的标准配置了。
- 与其说是带宽资源的规划,其实真正要规划的是所需的 Kafka 服务器的数量。
- 假设你公司 的机房环境是千兆网络,即 1Gbps现在你有个业务其业务目标或 SLA 是在 1 小时内处 理 1TB 的业务数据。那么问题来了,你到底需要多少台 Kafka 服务器来完成这个业务呢?
- 让我们来计算一下,由于带宽是 1Gbps即每秒处理 1Gb 的数据,假设每台 Kafka 服务 器都是安装在专属的机器上,也就是说每台 Kafka 机器上没有混布其他服务,毕竟真实环境中不建议这么做。
通常情况下你只能假设 Kafka 会用到 70% 的带宽资源,因为总要为其 他应用或进程留一些资源。根据实际使用经验,超过 70% 的阈值就有网络丢包的可能性了,故 70% 的设定是一个比 较合理的值,
也就是说单台 Kafka 服务器最多也就能使用大约 700Mb 的带宽资源。稍等,这只是它能使用的最大带宽资源,你不能让 Kafka 服务器常规性使用这么多资源, 故通常要再额外预留出 2/3 的资源,
即单台服务器使用带宽 700Mb / 3 ≈ 240Mbps。需 要提示的是,这里的 2/3 其实是相当保守的,你可以结合你自己机器的使用情况酌情减少 此值。 好了,有了 240Mbps
我们就可以计算 1 小时内处理 1TB 数据所需的服务器数量了。根 据这个目标,我们**每秒需要处理 2336Mb [1024*1024/3600*8] 的数据,除以 240约等于 10 台服务器**。
如果 消**息还需要额外复制两份,那么总的服务器台数还要乘以 3即 30 台**。
- Kafka 性能测试脚本
- kafka-producer-perf-test 脚本还不错kafka-consumer-perf-test有点难用
- 混部
- 一个机器上 部署一个zookeeper和一个kafka, 如果配置好**ZooKeeper事务日志**(比如设置好autopurge.purgeInterval及定期删除 snapshot等)它对IO的需求不大混布也是可以的。
---
- 最最最重要的集群参数配置
- Kafka 服务器端的配置,其中既有 Broker 端参数,也有主题(Topic)级别的参数、JVM 端参数和操作系统级别的参数
- Broker 端参数
- 首先 Broker 是需要配置存储信息的,即 Broker 使用哪些 磁盘。那么针对存储信息的重要参数有以下这么几个:
- **log.dirs**: 这是非常重要的参数,指定了 Broker 需要 使用的若干个文件目录路径。要知道这个参数是没有默认 值的,这说明什么?这说明它**必须由你亲自指定**。
- log.dir: 注意这是 dir结尾没有 s说明它只能表示 单个路径,它是**补充上一个参数用的**。
- 这两个参数应该怎么设置呢?很简单,你只要设置 log.dirs即第一个参数就好了不要设置log.dir。而且更重要的是在线上生产环境中一定要为log.dirs配置 多个路径,
具体格式是一个 CSV 格式,也就是用逗号分隔 的多个路径,比 如/home/kafka1,/home/kafka2,/home/kafka3这 样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。
- 这样做有两个好处:
- 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
- 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新 引入的强大功能。要知道在以前,只要 Kafka Broker 使 用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。 但是自 1.1 开始,这种情况被修正了,
坏掉的磁盘上的数 据会自动地转移到其他正常的磁盘上,而且 Broker 还能 正常工作。这个改进正是我们舍弃 RAID 方案的基 础:没有这种 Failover 的话,我们只能依靠 RAID 来提 供保障。
- 与 ZooKeeper 相关的设置
- 首先 ZooKeeper 是 做什么的呢?它是一个分布式协调框架,负责协调管理并保 存 Kafka 集群的所有元数据信息比如集群都有哪些Broker 在运行、创建了哪些 Topic每个 Topic 都有多少 分区以及这些分区的 Leader 副本都在哪些机器上等信息。
- Kafka 与 ZooKeeper 相关的最重要的参数当属 **zookeeper.connect**
- 这也是一个 CSV 格式的参数,比 如我可以指定它的值为 **zk1:2181,zk2:2181,zk3:2181**。2181 是 ZooKeeper 的默认端口。
- 如果我让**多个 Kafka 集群使用同一套 ZooKeeper 集群**,那么这个参数应该怎么设置呢?这时候 chroot 就派上用场了。这个 chroot 是 ZooKeeper 的概 念,类似于别名。
- 如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2那么两套集群的zookeeper.connect参数可以 这样指定:**zk1:2181,zk2:2181,zk3:2181/kafka1和 zk1:2181,zk2:2181,zk3:2181/kafka2**。
- **切记 chroot 只需要写一次,而且是加到最后的**。我经常碰到有人这样指 定: zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3这样的格式是不对的。
- 第三组参数是与 Broker 连接相关的,即客户端程序或其他 Broker 如何与该 Broker 进行通信的设置。有以下三个参 数:
- **listeners**: 学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。
- advertised.listeners: 和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的, 就是说**这组监听器是 Broker 用于对外发布的**。
- [内网环境访问Kafka不需要配置这个参数常见的玩法是:你的Kafka Broker机器上配置了双网卡一 块网卡用于内网访问(即我们常说的内网IP);另一个块用 于外网访问。那么你可以配置listeners为内网IP advertised.listeners为外网IP。]
- ~~host.name/port~~: 列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。
- 监听器的概念,从构成上来说,它是若干个逗 号分隔的三元组,**每个三元组的格式为<协议名称,主机 名,端口号>**。这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密 传输等;
也可能是你自己定义的协议名字,比如 CONTROLLER: //localhost:9092。
- 一旦你自己定义了协议名称,你必须还要指定 listener.security.protocol.map参数告诉这个协议底层使用了哪种安全协议比如指定 listener.security.protocol.map=CONTROLLER:P LAINTEXT表示CONTROLLER这个自定义协议底层使用明文 不加密传输数据。
- 经常有人会问主机 名这个设置中我到底使用 IP 地址还是主机名。这里我给出 统一的建议:**最好全部使用主机名**,即 **Broker 端和 Client 端应用配置中全部填写主机名**。 Broker 源代码中也使用的 是主机名,如果你在某些地方使用了 IP 地址进行连接,可能会发生无法连接的问题。
- 第四组参数是关于 Topic 管理的。我来讲讲下面这三个参数:
- auto.create.topics.enable:是否允许自动创建 Topic。
- [建议最好设置成 **false**,即不允许自动创建 Topic。每个部门被分配的 Topic 应该由运维严格把控,决不能允许自行创建任何 Topic。]
- unclean.leader.election.enable:是否允许 Unclean Leader 选举。
- [每个分区都有多个副本来提供高可用。 在这些副本中只能有一个副本对外提供服务,即所谓的 Leader 副本。建议你还是显式地把它设置成 **false** 吧]
- auto.leader.rebalance.enable:是否允许定期进 行 Leader 选举。
- [对生产环境影响非常大,设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满 足一定的条件才会发生。严格来说它与上一个参数中Leader 选举的最大不同在于,
它不是选 Leader而是换 Leader!比如 Leader A 一直表现得很好,但若 true则有可能 一段时间后 Leader A 就要被强行卸任换成 Leader B。建议你在生产环境中把这 个参数设置成 **false**。]
- 最后一组参数是数据留存方面的,即:
- log.retention.{hour|minutes|ms}:这是个“三 兄弟”,都是控制一条消息数据被保存多长时间。从优先 级上来说 ms 设置最高、minutes 次之、hour 最低。
- [虽然 ms 设置有最高的优先级,但是 通常情况下我们还是设置 hour 级别的多一些,比如 log.retention.hour=168表示默认保存 7 天的数据,自动删除 7 天前的数据。很多公司把 Kafka 当做存储来使 用,那么这个值就要相应地调大。]
- log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。
- [这个值默认是 -1 表明你想在这台 Broker 上保存多少数据都可以,至少在容 量方面 Broker 绝对为你开绿灯,不会做任何阻拦。这个参 数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集 群:设想你要做一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间]
- message.max.bytes:控制 Broker 能够接收的最大消息大小。
- [不能使用默认值的参数,默认的 1000012 太少了,还不到 1MB。实际场景中突 破 1MB 的消息都是屡见不鲜的,因此在线上环境中设置一 个比较大的值还是比较保险的做法。毕竟它只是一个标尺而 已,仅仅衡量 Broker 能够处理的最大消息大小,即使设置 大一点也不会耗费什么磁盘空间的。]
- 补充参数
- gg.handler.kafkahandler.Mode = tx gg.handler.kafkahandler.Mode = op 这两个的差别。遇到时 dml 数据会丢失的情况。用的是 op 。
- 当设置成op单个数据库表的变更(插入、更新、删除) 会被当成一条Kafka消息发送;当设置成tx时数据库事务 所做的所有变更统一被封装进一条Kafka消息并在事务提 交后被发送。后者有事务性的保障,至少有原子性方面的保证,不会丢失部分 CDC 数据。
- Topic 级别参数
- Topic 级别参数会**覆盖全局 Broker 参数的值**,而每个 Topic 都能设置自己的参数值, 这就是所谓的 Topic 级别参数。
- **retention.ms**: 规定了该 Topic 消息被保存的时长。 默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦 设置了这个值,它会覆盖掉 Broker 端的全局参数值。
- **retention.bytes**:规定了要为该 Topic 预留多大的磁 盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1表示可以 无限使用磁盘空间。
- Topic 级别 参数的设置就是这种情况,我们有两种方式可以设置:
- 创建 Topic 时进行设置
- 修改 Topic 时设置
- 如何在创建 Topic 时设置这些参数
- 你的部门需要将交易数据发送到 Kafka 进行处理,需要保 存最近半年的交易数据,同时这些数据很大,通常都有几 MB但一般不会超过 5MB。现在让我们用以下命令来创建 Topic:
- bin/kafka-topics.sh--bootstrap-serverlocalhost:9092--create--topictransaction--partitions1--replication-factor1--configretention.ms=15552000000--configmax.message.bytes=5242880
- 请注意结尾处的--config设置我们就是在 config 后面指定了想要设置的 Topic 级别参数。
- 使用另一个自带的命令kafka-configs来修改 Topic 级别参数。假设我们现在要发送最大值是 10MB 的消息,该如何修改呢?命令如下:
- bin/kafka-configs.sh--zookeeperlocalhost:2181--entity-typetopics--entity-nametransaction--alter--add-configmax.message.bytes=10485760
- [你最好始终坚持使用第二种方式来设置并且在未来Kafka 社区很有可能统一使用kafka-configs脚本来调整 Topic 级别参数。]
- JVM 参数
- Kafka 服务器端代码是用 Scala 语言编写的,但终归还是编译成 Class 文件在 JVM 上运行,因此 JVM 参数设置对于 Kafka 集群的重要性不言而喻。
- 有条件的话至少使用 Java 8 吧, [Kafka 2.0已经不支持Java 7了2.1版本开始初步支持Java 11但不建议生产环境用11所以还是使用Java 8吧。]
- 堆大小这个参数至关重要。无脑给出一个通用的建议:将你的 JVM 堆大小设置成 **6GB** 吧,这是目前业界比较公认的一个合理值。我见过很多人就是使用默认的 Heap Size 来跑 Kafka说实话默认的 1GB 有点小,
毕竟 Kafka Broker 在与客户端进行交互时会在 JVM 堆上创建大量的 ByteBuffer 实例Heap Size 不能太小。
- [没有通用的标准只有一个最佳实践值6GB。最好还是监控一下实时的堆大小特别是**GC之后的live data大小通常将heapsize设置成其1.5~2倍就足以了**]
- JVM 端配置的另一个重要参数就是垃圾回收器的设置,也就是平时常说的 GC 设置。如果你依然在使用 Java 7那么可以根据以下法则选择合适的垃圾回收器
- 如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。
- 否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。
- 当然了,如果你已经在使用 Java 8 了[G1是jdk9中默认的jdk8还是需要显式指定的]。在没有任何调优的情况下G1 表现得要比 CMS 出色,主要体现在更少的 Full GC需要调整的参数更少等所以使用 G1 就好了。
- Java8默认的新生代垃圾回收器是UseParallelGC可以用-XX:+PrintCommandLineFlags -version查看如果显示指定 -XX:+UseCurrentMarkSweepGC 的话,会默认开启 -XX:+UseParallelGC
- 确定好了要设置的 JVM 参数,我们该如何为 Kafka 进行设置呢?
- 只需要设置下面这两个环境变量即可:
- KAFKA_HEAP_OPTS指定堆大小。
- KAFKA_JVM_PERFORMANCE_OPTS指定 GC 参数。
- 比如你可以这样启动 Kafka Broker即在启动 Kafka Broker 之前,先设置上这两个环境变量:
- $> **export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g**
- $> **export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true**
- $> **bin/kafka-server-start.sh config/server.properties**
- 操作系统参数
- Kafka 并不需要设置太多的 OS 参数,但有些因素最好还是关注一下,比如下面这几个:
- 文件描述符限制
- 文件系统类型
- Swappiness
- 提交时间
- 首先是 **ulimit -n**
- 觉得任何一个 Java 项目最好都调整下这个值。实际上,文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如 **ulimit -n 1000000**
- 但不设置的话后果很严重比如你会经常看到“Too many open files”的错误。
- 其次是文件系统类型的选择。
- 这里所说的文件系统指的是如 ext3、ext4 或 XFS 这样的日志型文件系统。根据官网的测试报告,**XFS** 的性能要强于 ext4所以生产环境最好还是使用 XFS。对了最近有个 Kafka 使用 **ZFS** 的数据报告,貌似性能更加强劲,有条件的话不妨一试。
- 第三是 swap 的调优
- 网上很多文章都提到设置其为 0将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。我个人反倒觉得还是不要设置成 0 比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成 0当物理内存耗尽时操作系统会触发 OOM killer 这个组件,
它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,
我**个人建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1**。
### 1.5 kafka的基本操作
```shell
# 1、启动Kafka - todo 这个命令会让控制台有日志生成不太方便
bin/kafka-server-start.sh config/server.properties &
# 2、停止Kafka
bin/kafka-server-stop.sh
# 3、创建Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic jiangzh-topic
# 4、查看已经创建的Topic信息
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 5、发送消息
bin/kafka-console-producer.sh --broker-list 192.168.220.128:9092 --topic jiangzh-topic
# 6、接收消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.220.128:9092 --topic jiangzh-topic --from-beginning
# {"orderId":"002","price":"80"}
```
```shell
# 【旧版】
kafka-topics.sh --zookeeper 172.16.26.183:2181 --list
# 【新版】
kafka-topics.sh --bootstrap-server 172.16.26.183:9092 --list
# zookeeper is not a recognized option主要原因是 Kafka 版本过高,命令不存在)
# 创建topic主题
kafka-topics.sh --bootstrap-server 172.16.26.183:9092 --create --topic topic1 --partitions 1 --replication-factor 3
# --create 命令后 --topic 为创建topic 并指定 topic name
# --partitions 为指定分区数量
# --replication-factor 为指定副本集数量
# 向kafka集群发送数据
# 【无key型消息】
kafka-console-producer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1
# 【有key型消息】
kafka-console-producer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 --property parse.key=true
# 默认消息键与消息值间使用“Tab键”进行分隔切勿使用转义字符(\t)
# kafka命令接受数据
kafka-console-consumer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 --from-beginning
# kafka查看消费进度当我们需要查看一个消费者组的消费进度时则使用下面的命令(启动Consumer的时候才会真的生效)【这条命令对查询消费特别重要】
kafka-consumer-groups.sh --bootstrap-server 172.16.26.183:9092 --describe --group group1
```
## 2. 客户端实践及原理剖析
### 2.1 生产者消息分区机制原理剖析
- 为什么分区?
- Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息
- 主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。
- 为什么使用分区的概念而不是直接使用多个主题呢?
- 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了**实现系统的高伸缩性Scalability**
- 不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能**独立地执行各自分区的读写请求处理**。
- 并且,我们还可以通过**添加新的节点机器来增加整体系统的吞吐量**。
- 实际上分区的概念以及分区数据库早在 1980 年就已经有大牛们在做了,比如那时候有个叫 Teradata 的数据库就引入了**分区的概念**。
- Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard而在 HBase 中则叫 Region在 Cassandra 中又被称作 vnode
- 除了提供负载均衡这种最核心的功能之外,利用分区也可以实现其他一些业务级别的需求,比如**实现业务级别的消息顺序的问题**
- 都有哪些分区策略?
- 所谓分区策略是决定生产者将消息发送到哪个分区的算法。
- 如果要自定义分区策略你需要显式地配置生产者端的参数partitioner.class
- 在编写生产者程序时你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。
- 这个接口也很简单只定义了两个方法partition()和close(),通常你只需要实现最重要的 partition 方法。
- 方法签名int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
- 这里的topic、key、keyBytes、value和valueBytes都属于消息数据cluster则是集群信息比如当前 Kafka 集群共有多少主题、多少 Broker 等)
- Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。
- 只要你自己的**实现类定义好了 partition 方法同时设置partitioner.class参数为你自己实现类的 Full Qualified Name**,那么生产者程序就会按照你的代码逻辑对消息进行分区。
- 比较常见的分区策略
- **轮询策略**
- 也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0第二条被发送到分区 1第三条被发送到分区 2以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0就像下面这张图展示的那样。
- ![轮询策略](pic/轮询策略.png)
- 这就是所谓的轮询策略。轮询策略是 Kafka Java 生产者 API **默认提供的分区策略**。如果你未指定partitioner.class参数那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。
- [轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。]
- **随机策略**
- 也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示。
- ![随机策略](pic/随机策略.png)
- 如果要实现随机策略版的 partition 方法,很简单,只需要两行代码即可:
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- return ThreadLocalRandom.current().nextInt(partitions.size());
- 先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。
- 本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以**如果追求数据的均匀分布,还是使用轮询策略比较好**。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。
- **按消息键保序策略**
- 也称 Key-ordering 策略。这个名词是我自己编的Kafka 官网上并无这样的提法。
- Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。
特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,
**由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略**,如下图所示。
- ![按消息键保序策略](pic/按消息键保序策略.png)
- 实现这个策略的 partition 方法同样简单,只需要下面两行代码即可:
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- return Math.abs(key.hashCode()) % partitions.size();
- 前面提到的 Kafka 默认分区策略实际上同时实现了两种策略:**如果指定了 Key那么默认实现按消息键保序策略如果没有指定 Key则使用轮询策略。**
- 如何实现消息的顺序问题?
- 企业发送的 Kafka 的消息是有因果关系的,故处理因果关系也必须要保证有序性,否则先处理了“果”后处理“因”必然造成业务上的混乱。
- 当时企业的做法是给 Kafka 主题设置单分区,也就是 1 个分区。这样所有的消息都只在这一个分区内读写,因此保证了全局的顺序性。
这样做虽然实现了因果关系的顺序性,但也丧失了 Kafka 多分区带来的高吞吐量和负载均衡的优势。
- 后来经过了解和调研,我发现这种具有因果关系的消息都有一定的特点,比如在**消息体中都封装了固定的标志位**,改进建议他们**对此标志位设定专门的分区策略,保证同一标志位的所有消息都发送到同一分区**
这样既可以保证分区内的消息顺序,也可以享受到多分区带来的性能红利。
- 这种基于个别字段的分区策略本质上就是按消息键保序的思想,其实更加合适的做法是把**标志位数据提取出来统一放到 Key 中,这样更加符合 Kafka 的设计思想**。
- 其他分区策略
- 其实还有一种比较常见的,即所谓的**基于地理位置的分区策略**。当然这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群。
- 假设的所有服务都部署在北京的一个机房(这里我假设它是自建机房,不考虑公有云方案。其实即使是公有云,实现逻辑也差不多),现在考虑在南方找个城市(比如广州)再创建一个机房;
另外从两个机房中选取一部分机器共同组成一个大的 Kafka 集群。显然,这个集群中必然有一部分机器在北京,另外一部分机器在广州。
- 计划为每个新注册用户提供一份注册礼品,比如南方的用户可以免费得到一碗“甜豆腐脑”,而北方的新注册用户可以得到一碗“咸豆腐脑”。
如果用 Kafka 来实现则很简单,只需要创建一个双分区的主题,然后再创建两个消费者程序分别处理南北方注册用户逻辑即可。
- 但问题是你需要把南北方注册用户的注册消息正确地发送到位于南北方的不同机房中,因为处理这些消息的消费者程序只可能在某一个机房中启动着。
换句话说,送甜豆腐脑的消费者程序只在广州机房启动着,而送咸豆腐脑的程序只在北京的机房中,如果你向广州机房中的 Broker 发送北方注册用户的消息,那么这个用户将无法得到礼品!
- 此时我们就可以根据 Broker 所在的 IP 地址实现定制化的分区策略。比如下面这段代码:
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
- 我们可以从所有分区中找出那些 Leader 副本在南方的所有分区,然后随机挑选一个进行消息发送。
- 补充问题
- 在消息重试的时候分区策略会重新再计算一次吗比如一开始选择到5号分区但是5号分区有问题导致重试重试的时候可以重试发送到别的分区上吗
- 不会的。消息重试只是简单地将消息重新发送到之前的分区
- 在看kafka-client生产者默认分区源码时看到了cluster.partitionsForTopic和cluster.availablePartitionsForTopic请问什么时候分区是available什么时候是不unavailable的
- 分区没有leader的时候就是unavailable了。某些操作会导致瞬间没有leader比如分区reassign、换leader等
- RocketMQ与Kafka的主要区别
- Kafka吞吐量大多是面向大数据场景。RocketMQ吞吐量也很强 不过它号称是金融业务级的消息中间件,也就是说可以用于实际的业务系统;
- RocketMQ毕竟是阿里出品在国内技术支持力度要比Kafka强
- Kafka现在主要发力StreamingRocketMQ在流处理这块表现如何我不太清楚至少streaming不是它现阶段的主要卖点。
- 其他方面这两者确实都差不多
- 广州机房怎么消费广州partition的数据consumer如何指定消费的partition。
- 使用这个方法consumer.assign()直接消息指定分区
### 2.2 生产者压缩算法面面观
- 说起压缩compression我相信你一定不会感到陌生。它秉承了**用时间去换空间的经典 trade-off 思想**,具体来说就是用 CPU 时间去换磁盘空间或网络 I/O 传输量,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。在 Kafka 中,压缩也是用来做这件事的。
- 怎么压缩?
- Kafka 是如何压缩消息的呢?要弄清楚这个问题,就要从 Kafka 的消息格式说起了。目前 **Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本**。V2 版本是 Kafka 0.11.0.0 中正式引入的。
- 不论是哪个版本Kafka 的消息层次都分为两层:**消息集合message set以及消息message**。**一个消息集合中包含若干条日志项record item而日志项才是真正封装消息的地方**。
Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka **通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作**。
- V2 版本主要是针对 V1 版本的一些弊端做了修正
- 一个,就是把消息的公共部分抽取出来放到外层消息集合里面,这样就不用每条消息都保存这些信息了。
- 在 V2 版本中,消息的 CRC 校验工作就被移到了消息集合这一层。
- V2 版本还有一个和压缩息息相关的改进,就是保存压缩消息的方法发生了变化。
- 之前 V1 版本中保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息体字段中;而 V2 版本的做法是对整个消息集合进行压缩。显然后者应该比前者有更好的压缩效果。
- V2 版本都比 V1 版本节省磁盘空间。
- 何时压缩?
- 在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。
- 生产者程序中配置 compression.type 参数即表示启用指定类型的压缩算法。
- 下面这段程序代码展示了如何构建一个开启 GZIP 的 Producer 对象:
```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启 GZIP 压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
```
- 这里比较关键的代码行是 props.put(“compression.type”, “gzip”),它表明该 Producer 的压缩算法使用的是 GZIP。这样 Producer 启动后生产的每个消息集合都是经 GZIP 压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。
- 在生产者端启用压缩是很自然的想法,那为什么我说在 Broker 端也可能进行压缩呢?其实大部分情况下 **Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改**,但这里的“大部分情况”也是要满足一定条件的。
- 有两种例外情况就可能让 Broker 重新压缩消息。
- 情况一Broker 端指定了和 Producer 端不同的压缩算法。
- Producer 说:“我要使用 GZIP 进行压缩。”
- Broker 说:“不好意思,我这边接收的消息必须使用 Snappy 算法进行压缩。”
- Kafka 官网,你会发现 Broker 端也有一个参数叫 compression.type和上面那个例子中的同名。但是这个**参数的默认值是 producer**,这表示 Broker 端会“尊重”Producer 端使用的压缩算法。
可一旦你在 Broker 端设置了不同的 compression.type 值,就一定要小心了,因为可能会发生预料之外的压缩 / 解压缩操作,**通常表现为 Broker 端 CPU 使用率飙升**。
- 情况二Broker 端发生了消息格式转换。
- 在一个生产环境中Kafka 集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,
它还让 Kafka 丧失了引以为豪的 Zero Copy 特性。
- 所以尽量保证消息格式的统一吧,这样不仅可以避免不必要的解压缩 / 重新压缩,对提升其他方面的性能也大有裨益。如果有兴趣你可以深入地了解下 Zero Copy 的原理。
- 何时解压缩?
- 通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到 Broker 后Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时Broker 依然原样发送出去,**当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息**。
- Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。**Kafka 会将启用了哪种压缩算法封装进消息集合中**,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。
- 住这句话:**Producer 端压缩、Broker 端保持、Consumer 端解压缩。**
- 除了在 Consumer 端解压缩Broker 端也会进行解压缩。注意了,这和前面提到消息格式转换时发生的解压缩是不同的场景。每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,**目的就是为了对消息执行各种验证**。
我们必须承认**这种解压缩对 Broker 端性能是有一定影响的,特别是对 CPU 的使用率而言**。
- 国内京东的小伙伴们刚刚向社区提出了一个 bugfix建议去掉因为做消息校验而引入的解压缩。据他们称**去掉了解压缩之后Broker 端的 CPU 使用率至少降低了 50%**。
- 目前社区并未采纳这个建议,原因就是这种消息校验是非常重要的,不可盲目去之。毕竟先把事情做对是最重要的,**在做对的基础上,再考虑把事情做好做快**。
- 你也可以思考一下,是否有一个两全其美的方案,既能避免消息解压缩也能对消息执行校验。
- 各种压缩算法对比
- 在 Kafka 2.1.0 版本之前Kafka 支持 3 种压缩算法:**GZIP、Snappy 和 LZ4**。从 2.1.0 开始Kafka 正式支持 **Zstandard** 算法(简写为 zstd。它是 Facebook 开源的一个压缩算法能够提供超高的压缩比compression ratio
- 对了,看一个压缩算法的优劣,有两个重要的指标:
- 一个指标是**压缩比**,原先占 100 份空间的东西经压缩之后变成了占 20 份空间,那么压缩比就是 5显然压缩比越高越好
- 另一个指标就是**压缩 / 解压缩吞吐量**,比如每秒能压缩或解压缩多少 MB 的数据。同样地,吞吐量也是越高越好。
- Facebook Zstandard 官网提供的一份压缩算法 benchmark 比较结果:
- ![Zstandard压缩算法对比](pic/Zstandard压缩算法对比.png)
- zstd 算法有着最高的压缩比,而在吞吐量上的表现只能说中规中矩。
- 反观 LZ4 算法,它在吞吐量方面则是毫无疑问的执牛耳者。
- 在实际使用中GZIP、Snappy、LZ4 甚至是 zstd 的表现各有千秋。
- 但对于 Kafka 而言,它们的性能测试结果却出奇得一致,
- 即**在吞吐量方面LZ4 > Snappy > zstd 和 GZIP**
- 而**在压缩比方面zstd > LZ4 > GZIP > Snappy**。
- 具体到**物理资源**,使用 **Snappy 算法占用的网络带宽最多zstd 最少**,这是合理的,毕竟 zstd 就是要提供超高的压缩比;
-**CPU 使用率**方面,各个算法表现得差不多,**只是在压缩时 Snappy 算法使用的 CPU 较多一些**
- 而**在解压缩时 GZIP 算法则可能使用更多的 CPU**。
- 最佳实践
- 首先来说压缩。何时启用压缩是比较合适的时机呢?
- 你现在已经知道 Producer 端完成的压缩,那么启用压缩的一个条件就是 **Producer 程序运行机器上的 CPU 资源要很充足**。如果 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反。
- 除了 CPU 资源充足这一条件,如果你的环境中带宽资源有限,那么我也建议你开启压缩。
- 事实上我见过的很多 Kafka 生产环境都遭遇过带宽被打满的情况。这年头,**带宽可是比 CPU 和内存还要珍贵的稀缺资源**,毕竟万兆网络还不是普通公司的标配,因此千兆网络中 Kafka 集群带宽资源耗尽这件事情就特别容易出现。
- **如果你的客户端机器 CPU 资源有很多富余,我强烈建议你开启 zstd 压缩,这样能极大地节省网络资源消耗**。
- 其次说说解压缩。其实也没什么可说的。
- 一旦启用压缩,解压缩是不可避免的事情。
- 这里只想强调一点:我们对不可抗拒的解压缩无能为力,但至少能规避掉那些意料之外的解压缩。就像我前面说的,因为要兼容老版本而引入的解压缩操作就属于这类。有条件的话尽量保证不要出现消息格式转换的情况。
- 补充问题
- 正常情况下broker端会原样保存起来但是为了检验需要解压缩。该怎么去理解这个过程呢broker端解压缩以后还会压缩还原吗
- 它只是解压缩读取而已,不会将解压缩之后的数据回写到磁盘。另外就像我置顶的留言那样,目前社区已经接纳了京东小伙伴的修改,貌似可以绕过这部分解压缩了.
- 规避了broker端为执行校验而做的解压缩操作代码也merge进了2.4版本。有兴趣的同学可以看一下:
- https://issues.apache.org/jira/browse/KAFKA-8106
- 消息层次、消息集合、消息、日志项这些概念与它们之间的关系
- 消息批次RecordBatch里面包含若干条消息record)。 你可以认为消息批次和消息集合是等价的,消息和日志项是等价的。这样消息层次有两层:外层是消息批次(或消息集合);里层是消息(或日志项)。
Producer以recordbatch为单位发送消息对于V2版本一个batch中通常包含多条消息。在V2版本中在batch层面计算CRC值在V1版本中每条消息都要计算CRC值。
### 2.3 无消息丢失配置怎么实现?
- 一句话概括Kafka 只对**“已提交”的消息committed message做有限度的持久化保证**。
- 第一个核心要素是 **“已提交的消息”**。什么是已提交的消息?当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。
- 那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个 Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。不论哪种情况Kafka 只对已提交的消息做持久化保证这件事情是不变的。
- 第二个核心要素就是 **“有限度的持久化保证”**
- 假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中**至少有 1 个存活**。只要这个条件成立Kafka 就能保证你的这条消息永远不会丢失。
- “消息丢失”案例
- 案例 1生产者程序丢失数据
- 目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API那么它通常会立即返回但此时你不能认为消息发送已成功完成。
- 这种发送方式有个有趣的名字叫“fire and forget”翻译一下就是“发射后不管”。它的意思是执行完一个操作后不去管它的结果是否成功。调用 producer.send(msg) 就属于典型的“fire and forget”
- 如果用这个方式,可能会有哪些因素导致消息没有发送成功呢?
- 其实原因有很多,例如**网络抖动,导致消息压根就没有发送到 Broker 端**;或者**消息本身不合格导致 Broker 拒绝接收**(比如消息太大了,超过了 Broker 的承受能力)
- 就像前面说过的Kafka 不认为消息是已提交的,因此也就没有 Kafka 丢失消息这一说了。
- 解决此问题的方法非常简单:
- Producer **永远要使用带有回调通知的发送 API**,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。
- (回调),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。
- 如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;如果是消息不合格造成的,那么可以调整消息格式后再次发送。
- 总之,**处理发送失败的责任在 Producer 端而非 Broker 端。**
- 案例 2消费者程序丢失数据
- Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。Consumer 程序有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。
- ![Consumer端的位移数据](pic/Consumer端的位移数据.png)
- 比如对于 Consumer A 而言,它当前的位移值就是 9Consumer B 的位移值是 11。
- 这里的“位移”类似于我们看书时使用的书签,它会标记我们当前阅读了多少页,下次翻书的时候我们能直接跳到书签页继续阅读。
- 正确使用书签有两个步骤:第一步是读书,第二步是更新书签页。**不能颠倒**
- 要对抗这种消息丢失,办法很简单:**维持先消费消息(阅读),再更新位移(书签)的顺序即可**。这样就能最大限度地保证消息不丢失。
- 当然,**这种处理方式可能带来的问题是消息的重复处理**,类似于同一页书被读了很多遍,但这不属于消息丢失的情形。
- 还存在一种比较隐蔽的消息丢失场景。
- 对于 Kafka 而言, Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。
假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。
- 这里的**关键在于 Consumer 自动提交位移**
- 这个问题的解决方案也很简单:
- 如果是多线程异步处理消费消息Consumer 程序不要开启自动提交位移,而是要应用程序**手动提交位移**。
- 单个 Consumer 程序使用多线程来消费消息说起来容易,写成代码却异常困难,因为你很难正确地处理位移的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。
- 最佳实践
- 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,**一定要使用带有回调通知的 send 方法**。
- **设置 acks = all**。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
- **设置 retries 为一个较大的值**。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
- **设置 unclean.leader.election.enable = false**。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader必然会造成消息的丢失。
故一般都要将该参数设置成 false即不允许这种情况的发生。
- **设置 replication.factor >= 3**。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
- **设置 min.insync.replicas > 1**。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
- 确保 replication.factor > min.insync.replicas。如果两者相等那么只要有一个副本挂机整个分区就无法正常工作了。我们不仅要改善消息的持久性防止数据丢失还要在不降低可用性的基础上完成。
**推荐设置成 replication.factor = min.insync.replicas + 1**。
- 确保消息消费完成再提交。Consumer 端有个参数 **enable.auto.commit最好把它设置成 false**,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
- 补充
- 设置 acks = all。表明所有副本 Broker 都要接收到消息该消息才算是“已提交”。如果所有的Broker都要收到消息才能算作已提交会不会对系统的吞吐量影响很大另外这里的副本指的是不是仅仅是ISR?
- 碰到的实际场景影响还是很大的。acks=all时大部分的请求处理延时都花在了follower同步上。 是的acks=all表明所有ISR中的副本都要同步。
### 2.4 客户端都有哪些不常见但是很高级的功能?
- 什么是拦截器?
- 基本思想就是允许应用程序在**不修改逻辑的情况下****动态地实现**一组可**插拔的事件处理逻辑链**。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。
- Kafka 拦截器
- Kafka **拦截器分为生产者拦截器和消费者拦截器**。
- 生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。
值得一提的是,这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,**Kafka 会按照添加顺序依次执行拦截器逻辑**。
- Kafka 拦截器的设置方法是通过参数配置完成的。生产者和消费者两端有一个相同的参数,名字叫 **interceptor.classes**,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类。
- 假设第一个拦截器的完整类路径是 com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor第二个类是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor
那么你需要按照以下方法在 Producer 端指定拦截器:
```java
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器 1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器 2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
```
- 我们应该怎么编写 AddTimeStampInterceptor 和 UpdateCounterInterceptor 类呢?
- 这两个类以及你自己编写的所有 Producer 端拦截器实现类都**要继承 org.apache.kafka.clients.producer.ProducerInterceptor 接口**。该接口是 Kafka 提供的,里面有两个核心的方法。
- onSend该方法会在消息发送之前被调用。如果你想在**发送之前**对消息“美美容”,这个方法是你唯一的机会。
- onAcknowledgement该方法会在消息成功提交或发送失败之后被调用。还记得我在上一期中提到的发送回调通知 callback 吗?**onAcknowledgement 的调用要早于 callback 的调用**。
值得注意的是,**这个方法和 onSend 不是在同一个线程中被调用的**,因此如果你在这两个方法中调用了某个共享可变对象,**一定要保证线程安全哦**。
还有一点很重要,这个方法处在 Producer 发送的主路径中,所以**最好别放一些太重的逻辑进去**,否则你会发现你的 Producer TPS 直线下降。
- 同理,指定消费者拦截器也是同样的方法,只是具体的实现类要**实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口**,这里面也有两个核心方法。
- onConsume该方法在消息返回给 Consumer 程序**之前**调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。
- onCommitConsumer 在**提交位移之后调用**该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
- 一定要注意的是,指定拦截器类时要指定它们的**全限定名**,即 full qualified name。通俗点说就是要把完整包名也加上不要只有一个类名在那里并且还要保证你的 Producer 程序能够正确加载你的拦截器类。
- 典型使用场景
- 其实,跟很多拦截器的用法相同,**Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计**等多种功能在内的场景。
- Kafka 默认提供的监控指标都是针对单个客户端或 Broker 的,你很难从具体的消息维度去**追踪集群间消息的流转路径**。同时,如何监控一条消息从生产到最后消费的端到端延时也是很多 Kafka 用户迫切需要解决的问题。
- 通过实现拦截器的逻辑以及可插拔的机制,我们能够快速地观测、验证以及监控集群间的客户端性能指标,特别是能够**从具体的消息层面上去收集这些数据**。这就是 Kafka 拦截器的一个非常典型的使用场景。
- 我们再来看看消息审计message audit的场景。设想你的公司把 Kafka 作为一个私有云消息引擎平台向全公司提供服务,这必然要**涉及多租户以及消息审计的功能**。
- 作为私有云的 PaaS 提供方,你肯定要能够随时查看每条消息是哪个业务方在什么时间发布的,之后又被哪些业务方在什么时刻消费。
一个可行的做法就是你编写一个拦截器类,实现相应的消息审计逻辑,然后强行规定所有接入你的 Kafka 服务的客户端程序必须设置该拦截器。
- 案例分享
- 通过编写拦截器类来统计消息端到端处理的延时,非常实用,我建议你可以直接移植到你自己的生产环境中。
- 某个业务只有一个 Producer 和一个 Consumer他们想知道该业务消息从被生产出来到最后被消费的平均总时长是多少但是目前 Kafka 并没有提供这种端到端的延时统计。
- 既然是要计算总延时,**那么一定要有个公共的地方来保存它**,并且这个公共的地方还是要让生产者和消费者程序都能访问的。在这个例子中,我们假设数据被保存在 Redis 中。
- 实现生产者拦截器
```java
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
private Jedis jedis; // 省略 Jedis 初始化
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
jedis.incr("totalSentMessage");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<java.lang.String, ?> configs) {
}
```
- 上面的代码比较关键的是在发送消息前更新总的已发送消息数。为了节省时间,我没有考虑发送失败的情况,因为发送失败可能导致总发送数不准确。不过好在处理思路是相同的,你可以有针对性地调整下代码逻辑。
- 消费者端的拦截器实现,代码如下:
```java
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Jedis jedis; // 省略 Jedis 初始化
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long lantency = 0L;
for (ConsumerRecord<String, String> record : records) {
lantency += (System.currentTimeMillis() - record.timestamp());
}
jedis.incrBy("totalLatency", lantency);
long totalLatency = Long.parseLong(jedis.get("totalLatency"));
long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
```
- 在上面的消费者拦截器中,我们在真正消费一批消息前首先更新了它们的总延时,方法就是用**当前的时钟时间减去封装在消息中的创建时间,然后累计得到这批消息总的端到端处理延时并更新到 Redis 中**。
之后的逻辑就很简单了,我们分别从 Redis 中读取更新过的总延时和总消息数,两者相除即得到端到端消息的平均处理延时。
- 创建好生产者和消费者拦截器后,我们按照上面指定的方法分别将它们配置到各自的 Producer 和 Consumer 程序中,这样就能计算消息从 Producer 端到 Consumer 端平均的处理延时了。
这种端到端的指标监控能够从全局角度俯察和审视业务运行情况,及时查看业务是否满足端到端的 SLA 目标。
### 2.5 Java生产者是如何管理TCP连接的
- 为何采用 TCP
- 从社区的角度来看,在开发客户端时,人们能够利用 TCP 本身提供的一些高级功能,比如**多路复用请求以及同时轮询多个连接**的能力。
- 所谓的多路复用请求,即 multiplexing request是指将两个或多个数据流合并到底层单一物理连接中的过程。**TCP 的多路复用请求会在一条物理连接上创建若干个虚拟连接,每个虚拟连接负责流转各自对应的数据流**。
其实严格来说,**TCP 并不能多路复用,它只是提供可靠的消息交付语义保证**,比如自动重传丢失的报文。
- 作为一个基于报文的协议TCP 能够被用于多路复用连接场景的前提是,上层的应用协议(比如 HTTP允许发送多条消息。
- 除了 TCP 提供的这些高级功能有可能被 Kafka 客户端的开发人员使用之外,社区还发现,**目前已知的 HTTP 库在很多编程语言中都略显简陋**。
- 基于这两个原因Kafka 社区决定采用 TCP 协议作为所有请求通信的底层协议。
- Kafka 生产者程序概览
- Kafka 的 Java 生产者 API 主要的对象就是 KafkaProducer。通常我们开发一个生产者的步骤有 4 步。
- 第 1 步:构造生产者对象所需的参数对象。
- 第 2 步:利用第 1 步的参数对象,创建 KafkaProducer 对象实例。
- 第 3 步:使用 KafkaProducer 的 send 方法发送消息。
- 第 4 步:调用 KafkaProducer 的 close 方法关闭生产者并释放各种系统资源。
- 代码示例:
```java
Properties props = new Properties ();
props.put("参数 1", "参数 1 的值")
props.put("参数 2", "参数 2 的值")
//
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
producer.send(new ProducerRecord<String, String>(……), callback);
//
}
```
- 这段代码使用了 Java 7 提供的 try-with-resource 特性,所以并没有显式调用 producer.close() 方法。无论是否显式调用 close 方法,所有生产者程序大致都是这个路数。
- 当我们开发一个 Producer 应用时,生产者会向 Kafka 集群中指定的主题Topic发送消息这必然涉及与 Kafka Broker 创建 TCP 连接。那么Kafka 的 Producer 客户端是如何管理这些 TCP 连接的呢?
- 何时创建 TCP 连接?
- 生产者代码是什么时候创建 TCP 连接的。就上面的那段代码而言,可能创建 TCP 连接的地方有两处Producer producer = new KafkaProducer(props) 和 producer.send(msg, callback)。你觉得连向 Broker 端的 TCP 连接会是哪里创建的呢?前者还是后者,抑或是两者都有?
- **在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与 Broker 的连接。**
- 测试环境中的日志来说明这一点:
```txt
[2018-12-09 09:35:45,620] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: -2 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,622] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
[2018-12-09 09:35:45,814] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,815] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
[2018-12-09 09:35:45,828] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9093 (id: -2 rack: null) (org.apache.kafka.clients.NetworkClient:1068)
```
- 如果不调用 send 方法,这个 Producer 都不知道给哪个主题发消息,它又怎么能知道连接哪个 Broker 呢?难不成它会连接 bootstrap.servers 参数指定的所有 Broker 吗?
- 解释一下 bootstrap.servers 参数。它是 Producer 的核心参数之一,指定了这个 Producer 启动时要连接的 Broker 地址。
- 这里的“启动时”,代表的是 Producer 启动时会发起与这些 Broker 的连接。因此,如果你为这个参数指定了 1000 个 Broker 连接信息,那么很遗憾,你的 **Producer 启动时会首先创建与这 1000 个 Broker 的 TCP 连接**
- 在实际使用过程中,我并**不建议把集群中所有的 Broker 信息都配置到 bootstrap.servers 中**,通常你指定 **34 台就足以**了。因为 **Producer 一旦连接到集群中的任一台 Broker就能拿到整个集群的 Broker 信息,故没必要为 bootstrap.servers 指定所有的 Broker**
- 在 KafkaProducer 实例被创建后以及消息被发送前Producer 应用就开始创建与两台 Broker 的 TCP 连接了。
- 你不需要了解 RecordAccumulator 类是做什么的,你只要知道它主要的数据结构是一个 ConcurrentMap<TopicPartition, Deque>。TopicPartition 是 Kafka 用来表示主题分区的 Java 对象,本身是不可变对象。而 RecordAccumulator 代码中用到 Deque 的地方都有锁的保护,所以基本上可以认定 RecordAccumulator 类是线程安全的。
- **TCP 连接是在创建 KafkaProducer 实例时建立的。**
- 它只会在这个时候被创建吗?
- 当然不是TCP 连接还可能在两个地方被创建:一个是在更新元数据后,另一个是在消息发送时。
- 当 Producer 更新了集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。同样地当要发送消息时Producer 发现尚不存在与目标 Broker 的连接,也会创建一个。
- Producer 更新集群元数据信息的两个场景
- 场景一:当 Producer 尝试给一个不存在的主题发送消息时Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。
- 场景二Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000即 5 分钟也就是说不管集群那边是否有变化Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。
- 何时关闭 TCP 连接?
- Producer 端关闭 TCP 连接的方式有两种:**一种是用户主动关闭;一种是 Kafka 自动关闭**。
- 第一种。这里的主动关闭实际上是广义的主动关闭,甚至包括用户调用 kill -9 主动“杀掉”Producer 应用。当然最推荐的方式还是调用 producer.close() 方法来关闭。
- 第二种是 Kafka 帮你关闭,这与 Producer 端参数 connections.max.idle.ms 的值有关。默认情况下该参数值是 9 分钟,即如果在 9 分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。用户可以在 Producer 端设置 connections.max.idle.ms=-1 禁掉这种机制。一旦被设置成 -1TCP 连接将成为永久长连接。
当然这只是软件层面的“长连接”机制,由于 Kafka 创建的这些 Socket 连接都开启了 keepalive因此 keepalive 探活机制还是会遵守的。
- 在第二种方式中TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。
### 2.6 幂等生产者和事务生产者是一回事吗?
- Kafka 消息交付可靠性保障以及精确处理一次语义的实现。
- 所谓的消息交付可靠性保障,**是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺**。常见的承诺有以下三种:
- 最多一次at most once消息可能会丢失但绝不会被重复发送。
- 至少一次at least once消息不会丢失但有可能被重复发送。
- 精确一次exactly once消息不会丢失也不会被重复发送。
- 目前Kafka **默认提供的交付可靠性保障是第二种**,即至少一次。
- Kafka 也可以提供最多一次交付保障,**只需要让 Producer 禁止重试即可**。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。我们通常不会希望出现消息丢失的情况,但一些场景里偶发的消息丢失其实是被允许的,相反,消息重复是绝对要避免的。此时,使用最多一次交付保障就是最恰当的。
- 那么问题来了Kafka 是怎么做到精确一次的呢?简单来说,这是通过两种机制:**幂等性Idempotence和事务Transaction**。
- 什么是幂等性Idempotence
- 幂等性有很多好处,其最大的优势在于**我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态**。如果是非幂等性操作,我们还需要担心某些操作执行多次对状态的影响,但对于幂等性操作而言,我们根本无需担心此事。
- 幂等性 Producer
- 在 Kafka 中Producer **默认不是幂等性的**,但我们可以创建幂等性 Producer。它其实是 **0.11.0.0 版本引入的新功能**
- 在此之前Kafka 向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可
- **props.put(“enable.idempotence”, ture)**,或 **props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG true)**
- enable.idempotence 被设置成 true 后Producer **自动升级成幂等性 Producer**,其他所有的代码逻辑都不需要改变。
- **kafka 自动帮你做消息的重复去重**。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。
- 必须要了解幂等性 Producer 的作用范围
- 首先,它只能保证**单分区上的幂等性**,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
- 其次,它只能实现**单会话上的幂等性**,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
- 如果我想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是**事务transaction或者依赖事务型 Producer**。这也是幂等性 Producer 和事务型 Producer 的**最大区别**
- 事务
- 经典的 ACID即原子性Atomicity、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)。
- 隔离性表明并发执行的事务彼此相互隔离,互不影响。
- Kafka 自 **0.11 版本开始也提供了对事务的支持**,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。
- 事务型 Producer
- 事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后Kafka 依然保证它们发送消息的精确一次处理。
- 设置事务型 Producer 的方法也很简单,满足两个要求即可:
- 和幂等性 Producer 一样,开启 **enable.idempotence = true**
- **设置 Producer 端参数 transctional. id**。最好为其设置一个有意义的名字。
- 此外,你还需要在 Producer 代码中做一些调整,如这段代码所示:
```java
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
```
- 和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
- 这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka要么它们全部提交成功要么全部写入失败。实际上即使写入失败Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。
因此在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。
- 修改起来也很简单,设置 isolation.level 参数的值即可。当前这个参数有两个取值:
- read_uncommitted这是默认值表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer那么对应的 Consumer 就不要使用这个值。
- read_committed表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
- 比起幂等性 Producer事务型 Producer 的性能要更差,在实际使用过程中,我们需要仔细评估引入事务的开销,切不可无脑地启用事务。
- 补充
- 事务的使用场景
- 事务更多用在Kafka Streams中。如果要实现流处理中的精确一次语义事务是不可少的。
- 事务型 Producer 可以实现一组消息要么全部写入成功,要么全部失败,但是事务型 Producer 是具体怎么实现多分区以及多会话上的消息无重复的呢?
- 主要的机制是两阶段提交2PC。引入了事务协调器的组件帮助完成分布式事务
### 2.7 消费者组到底是什么?
- 用一句话概括就是Consumer Group 是 Kafka 提供的**可扩展且具有容错性的消费者机制**。
- 既然是一个组那么组内必然可以有多个消费者或消费者实例Consumer Instance它们共享一个公共的 ID这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题Subscribed Topics的所有分区Partition。当然每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。
- Consumer Group 记住下面这三个特性
- Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,**使用进程更为常见一些**。
- Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
- Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。
- 两种消息引擎模型
- 点对点模型和发布 / 订阅模型,前者也称为消费队列。
- Kafka 的 Consumer Group 就是这样的机制。当 Consumer Group 订阅了多个主题后,**组内的每个实例不要求一定要订阅主题的所有分区,它只会消费部分分区中的消息**。
- Consumer Group 之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。再加上 Broker 端的消息留存机制Kafka 的 Consumer Group 完美地规避了上面提到的伸缩性差的问题。
- 可以这么说Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引擎系统的两大模型:
- 如果所有实例都属于同一个 Group那么它实现的就是消息队列模型如果所有实例分别属于不同的 Group那么它实现的就是发布 / 订阅模型。
- 在实际使用场景中,我怎么知道一个 Group 下该有多少个 Consumer 实例呢?
- **理想情况下Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。**
- 假设一个 Consumer Group 订阅了 3 个主题,分别是 A、B、C它们的分区数依次是 1、2、3那么通常情况下为该 Group 设置 6 个 Consumer 实例是比较理想的情形,因为它能最大限度地实现高伸缩性。
- **在实际使用过程中一般不推荐设置大于总分区数的 Consumer 实例**。设置多余的实例只会浪费资源,而没有任何好处。
- 针对 Consumer GroupKafka 是怎么管理位移的呢?你还记得吧,消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息。在 Kafka 中这个位置信息有个专门的术语位移Offset
- 看上去该 Offset 就是一个数值而已,其实对于 Consumer Group 而言,它是一组 KV 对Key 是分区V 对应 Consumer 消费该分区的最新位移。
- 如果用 Java 来表示的话,你大致可以认为是这样的数据结构,即 Map<TopicPartition, Long>,其中 TopicPartition 表示一个分区,而 Long 表示位移的类型。
- Kafka 源码中并不是这样简单的数据结构,而是要比这个复杂得多
- Kafka 有新旧客户端 API 之分,那自然也就有新旧 Consumer 之分。老版本的 Consumer 也有消费者组的概念,它和我们目前讨论的 Consumer Group 在使用感上并没有太多的不同,只是**它管理位移的方式和新版本是不一样的**。
- 老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一个分布式的协调服务框架Kafka 重度依赖它实现各种各样的协调管理。将位移保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销。
- **现在比较流行的提法是将服务器节点做成无状态的,这样可以自由地扩缩容,实现超强的伸缩性。**
- Kafka 最开始也是基于这样的考虑,才将 Consumer Group 位移保存在独立于 Kafka 集群之外的框架中。
- 发现了一个问题,即 ZooKeeper 这类元框架其实并不适合进行频繁的写更新,而 Consumer Group 的位移更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能,因此 Kafka 社区渐渐有了这样的共识:将 Consumer 位移保存在 ZooKeeper 中是不合适的做法。
- 于是,在新版本的 Consumer Group 中Kafka 社区重新设计了 Consumer Group 的位移管理方式,采用了将位移保存在 Kafka 内部主题的方法。这个内部主题就是让人既爱又恨的 __consumer_offsets。
- **新版本的 Consumer Group 将位移保存在 Broker 端的内部主题中**。
- 最后,我们来说说 Consumer Group 端大名鼎鼎的重平衡,也就是所谓的 Rebalance 过程。我形容其为“大名鼎鼎”,从某种程度上来说其实也是“臭名昭著”,因为有关它的 bug 真可谓是此起彼伏,从未间断
- Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。
- 比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。
- 那么 Consumer Group 何时进行 Rebalance 呢Rebalance 的触发条件有 3 个。
- 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
- 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
- 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
- Rebalance 发生时Group 下所有的 Consumer 实例都会协调在一起共同参与。你可能会问,每个 Consumer 实例怎么知道应该消费订阅主题的哪些分区呢?这就需要分配策略的协助了。
- 当前 Kafka 默认提供了 3 种分配策略,每种策略都有一定的优势和劣势,我们今天就不展开讨论了
- 例子来说明一下 Consumer Group 发生 Rebalance 的过程。假设目前某个 Consumer Group 下有两个 Consumer比如 A 和 B当第三个成员 C 加入时Kafka 会触发 Rebalance并根据默认的分配策略重新为 A、B 和 C 分配分区,如下图所示:
- ![Rebalance的过程](pic/Rebalance的过程.png)
- 显然Rebalance 之后的分配依然是公平的,即每个 Consumer 实例都获得了 3 个分区的消费权。这是我们希望出现的情形。
- Rebalance 过程对 Consumer Group 消费过程有极大的影响
- 在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。
- 其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。
- 例如实例 A 之前负责消费分区 1、2、3那么 Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3而不是被重新分配其他的分区。这样的话实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。
- 最后Rebalance 实在是太慢了。曾经,有个国外用户的 Group 内有几百个 Consumer 实例,成功 Rebalance 一次要几个小时!
- 最好的解决方案就是避免 Rebalance 的发生。
- 补充
- 消费组中的消费者个数如果超过topic的分区数就会有消费者消费不到数据。但如果是同一个消费组里的两个消费者通过assign方法订阅了同一个TopicPartition是不是会有一个消费者不能消费到消息
- 如果使用assign则表明该consumer是独立consumerstandalone consumer它不属于任何消费者组。独立consumer可以订阅任何分区彼此之间也没有关系即两个独立consumer可以订阅并消费相同的分区
- 减少topic的分区数可以减少服务部署时rebalance的时间呢
- 减少consumer个数也有缩短rebalance。
- Kafka 社区重新设计了 Consumer Group 的位移管理方式”
- 0.9版本引入了新版本consumer。新版本将位移保存在__consumer_offsets中不需要额外配置。如果你想保存在外部的Zk集群中那么设置consumer端参数enable.auto.commit=false然后自己调用ZooKeeper API提交位移到Zk即可。
### 2.8 揭开神秘的“位移主题”面纱
- Kafka 中神秘的内部主题Internal Topic__consumer_offsets。
- __consumer_offsets 在 Kafka 源码中有个更为正式的名字,叫位移主题,即 Offsets Topic。为了方便今天的讨论我将统一使用位移主题来指代 __consumer_offsets。需要注意的是它有两个下划线哦。
- 新版本 Consumer 的位移管理机制其实也很简单,就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。可以这么说,**__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息**。
- 它要求这个提交过程不仅要实现高持久性还要支持高频的写操作。显然Kafka 的主题设计天然就满足这两个条件,因此,使用 Kafka 主题来保存位移这件事情,实际上就是一个水到渠成的想法了。
- 位移主题就是普通的 Kafka 主题。你可以手动地创建它、修改它,甚至是删除它。只不过,它同时也是一个内部主题,大部分情况下,你其实并不需要“搭理”它,也不用花心思去管理它,把它丢给 Kafka 就完事了。
- 虽说位移主题是一个普通的 Kafka 主题,但它的**消息格式却是 Kafka 自己定义的** 用户不能修改,也就是说你不能随意地向这个主题写消息,因为一旦你写入的消息不满足 Kafka 规定的格式,那么 Kafka 内部无法成功解析,就会造成 Broker 的崩溃。
- 事实上Kafka Consumer 有 API 帮你提交位移,也就是向位移主题写消息。你千万不要自己写个 Producer 随意向该主题发送消息。
- 首先从 Key 说起。
- 一个 Kafka 集群中的 Consumer 数量会有很多,既然这个主题保存的是 Consumer 的位移数据,那么消息格式中必须要有字段来标识这个位移数据是哪个 Consumer 的。这种数据放在哪个字段比较合适呢?显然放在 Key 中比较合适。
- Group ID ,它能够标识唯一的 Consumer Group。
- 除了 Consumer GroupKafka 还支持独立 Consumer也称 Standalone Consumer。它的运行机制与 Consumer Group 完全不同,但是位移管理的机制却是相同的。因此,即使是 Standalone Consumer也有自己的 Group ID 来标识它自己,所以也适用于这套消息格式。
- Key 中保存了 Group ID但是只保存 Group ID 就可以了吗别忘了Consumer 提交位移是在分区层面上进行的即它提交的是某个或某些分区的位移那么很显然Key 中还应该保存 Consumer 要提交位移的分区。
- 结论
- 位移主题的 Key 中应该保存 3 部分内容:<Group ID,主题名,分区号 >
- 消息体的设计
- 也许你会觉得消息体应该很简单,保存一个位移值就可以了。实际上,社区的方案要复杂得多,比如消息体还保存了位移提交的一些其他元数据,诸如时间戳和用户自定义的数据等。
- 保存这些元数据是为了帮助 Kafka 执行各种各样后续的操作,比如删除过期位移消息等。但总体来说,我们还是可以简单地认为消息体就是保存了位移值。
- 位移主题的消息格式可不是只有这一种。事实上,它有 3 种消息格式。除了刚刚我们说的这种格式,还有 2 种格式:
- 用于保存 Consumer Group 信息的消息。
- 用于删除 Group 过期位移甚至是删除 Group 的消息。
- 第 1 种格式非常神秘,以至于你几乎无法在搜索引擎中搜到它的身影。不过,你只需要记住它是用来注册 Consumer Group 的就可以了。
- 第 2 种格式相对更加有名一些。它有个专属的名字tombstone 消息,即墓碑消息,也称 delete mark。下次你在 Google 或百度中见到这些词,不用感到惊讶,它们指的是一个东西。这些消息只出现在源码中而不暴露给你。它的主要特点是它的消息体是 null即空消息体。
- 何时会写入这类消息呢?
- 一旦某个 Consumer Group 下的所有 Consumer 实例都停止了而且它们的位移数据都已被删除时Kafka 会向位移主题的对应分区写入 tombstone 消息,表明要彻底删除这个 Group 的信息。
- 位移主题是怎么被创建的。
- 通常来说,当 Kafka 集群中的第一个 Consumer 程序启动时Kafka 会自动创建位移主题。我们说过,位移主题就是普通的 Kafka 主题,那么它自然也有对应的分区数。
但如果是 Kafka 自动创建的,分区数是怎么设置的呢?这就要看 Broker 端参数 offsets.topic.num.partitions 的取值了。它的默认值是 50因此 Kafka 会自动创建一个 50 分区的位移主题。
如果你曾经惊讶于 Kafka 日志路径下冒出很多 __consumer_offsets-xxx 这样的目录,那么现在应该明白了吧,这就是 Kafka 自动帮你创建的位移主题啊。
- 位移主题是怎么被创建的。
- 通常来说,当 Kafka 集群中的第一个 Consumer 程序启动时Kafka 会自动创建位移主题。我们说过,位移主题就是普通的 Kafka 主题,那么它自然也有对应的分区数。但如果是 Kafka 自动创建的,分区数是怎么设置的呢?
这就要看 Broker 端参数 offsets.topic.num.partitions 的取值了。它的默认值是 50因此 Kafka 会自动创建一个 50 分区的位移主题。如果你曾经惊讶于 Kafka 日志路径下冒出很多 __consumer_offsets-xxx 这样的目录,
那么现在应该明白了吧,这就是 Kafka 自动帮你创建的位移主题啊。
- **如果位移主题是 Kafka 自动创建的,那么该主题的分区数是 50副本数是 3**。
- 你也可以选择手动创建位移主题。
- 具体方法就是,在 Kafka 集群尚未启动任何 Consumer 之前,使用 Kafka API 创建它。手动创建的好处在于,你可以创建满足你实际场景需要的位移主题。比如很多人说 50 个分区对我来讲太多了,我不想要这么多分区,那么你可以自己创建它,不用理会 offsets.topic.num.partitions 的值。
- **让 Kafka 自动创建比较好。目前 Kafka 源码中有一些地方硬编码了 50 分区数,因此如果你自行创建了一个不同于默认分区数的位移主题,可能会碰到各种各种奇怪的问题。**
- 目前 Kafka Consumer 提交位移的方式有两种:自动提交位移和手动提交位移。
- Consumer 端有个参数叫 enable.auto.commit如果值是 true则 Consumer 在后台默默地为你定期提交位移,提交间隔由一个专属的参数 auto.commit.interval.ms 来控制。自动提交位移有一个显著的优点,就是省事,你不用操心位移提交的事情,就能保证消息消费不会丢失。
但这一点同时也是缺点。因为它太省事了,以至于丧失了很大的灵活性和可控性,你完全没法把控 Consumer 端的位移管理。
- 很多与 Kafka 集成的大数据框架都是禁用自动提交位移的,如 Spark、Flink 等。这就引出了另一种位移提交方式:手动提交位移,即设置 enable.auto.commit = false。一旦设置了 false作为 Consumer 应用开发的你就要承担起位移提交的责任。
Kafka Consumer API 为你提供了位移提交的方法,如 consumer.commitSync 等。当调用这些方法时Kafka 会向位移主题写入相应的消息。
- 如果你选择的是自动提交位移,那么就可能存在一个问题:只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息。
- Kafka 是怎么删除位移主题中的过期消息的呢?
- 答案就是 Compaction。国内很多文献都将其翻译成压缩我个人是有一点保留意见的。在英语中压缩的专有术语是 Compression它的原理和 Compaction 很不相同,我更倾向于翻译成压实,或干脆采用 JVM 垃圾回收中的术语:整理。
- Kafka 使用Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义 Compact 策略中的过期呢?对于同一个 Key 的两条消息 M1 和 M2如果 M1 的发送时间早于 M2那么 M1 就是过期消息。
- Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。
- ![ Compact 过程]( pic/Compact过程.png)
- 图中位移为 0、2 和 3 的消息的 Key 都是 K1。Compact 之后,分区只需要保存位移为 3 的消息,因为它是最新发送的。
- **Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据**。
- 这个后台线程叫 Log Cleaner。很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题如果你的环境中也有这个问题我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。
- 补充
- consumer端 日常业务发版呢那每次发版需要重启consumer不是也会导致Rebalance这个如何规避
- 可以考虑使用standalone consumer否则group机制无法避免
- 具体是从哪个版本开始位移数据开始默认的不存在zk而是存在自己内部了 如果验证目前使用的环境位移是存在内部还是zk上 什么场景下适合使用自动提交位移?
- 1.0.9 推荐保存在Kafka内部不在乎重复消费
- offset topic是在coordinator对应的broker上创建且只创建一次是么
- offset topic在整个集群上被创建出来并且只会创建一次
- consumer 是如何从这个位移主题中拿到曾经属于自己组的offset呢
- 首先找到对应的CoordinatorCoordinator保存了这些数据然后consumer向Coordinator发送请求去请求这些数据
### 2.9 消费者组重平衡能避免吗?
- Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程。在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配。 但是,在整个过程中,所有实例都不能消费任何消息,因此它对 Consumer 的 TPS 影响很大。
- 所谓协调者,在 Kafka 中对应的术语是 Coordinator它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。
- 具体来讲,**Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移**。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。
- 所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,**所有 Broker 都有各自的 Coordinator 组件**。
- Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上呢? 答案Kafka 内部位移主题 __consumer_offsets 身上。
- 目前Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤。
- 第 1 步:确定由位移主题的哪个分区来保存该 Group 数据partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
- 首先Kafka 会计算该 Group 的 group.id 参数的哈希值。有个 Group 的 group.id 设置成了“test-group”那么它的 hashCode 值就应该是 627841412
- 其次Kafka 会计算 __consumer_offsets 的分区数,通常是 50 个分区,之后将刚才那个哈希值对分区数进行取模加求绝对值计算,即 abs(627841412 % 50) = 12我们就知道了位移主题的分区 12 负责保存这个 Group 的数据。
- 有了分区号,算法的第 2 步就变得很简单了,我们只需要找出位移主题分区 12 的 Leader 副本在哪个 Broker 上就可以了。
- 第 2 步:找出该分区 Leader 副本所在的 Broker该 Broker 即为对应的 Coordinator。这个 Broker就是我们要找的 Coordinator。
- 在实际使用过程中Consumer 应用程序,特别是 Java Consumer API能够自动发现并连接正确的 Coordinator我们不用操心这个问题。知晓这个算法的最大意义在于它能够帮助我们解决定位问题。
- 当 Consumer Group 出现问题,需要快速排查 Broker 端日志时,我们能够根据这个算法准确定位 Coordinator 对应的 Broker不必一台 Broker 一台 Broker 地盲查。
- Rebalance 的弊端是什么呢?总结起来有以下 3 点:
- **Rebalance 影响 Consumer 端 TPS**。这个之前也反复提到了,这里就不再具体讲了。总之就是,在 Rebalance 期间Consumer 会停下手头的事情,什么也干不了。
- **Rebalance 很慢**。如果你的 Group 下成员很多,就一定会有这样的痛点。还记得我曾经举过的那个国外用户的例子吧?他的 Group 下有几百个 Consumer 实例Rebalance 一次要几个小时。在那种场景下Consumer Group 的 Rebalance 已经完全失控了。
- **Rebalance 效率不高**。当前 Kafka 的设计机制决定了每次 Rebalance 时Group 下的所有成员都要参与进来,而且通常不会考虑局部性原理,但局部性原理对提升系统性能是特别重要的。
- 关于第 3 点,我们来举个简单的例子。比如一个 Group 下有 10 个成员,每个成员平均消费 5 个分区。假设现在有一个成员退出了,此时就需要开启新一轮的 Rebalance把这个成员之前负责的 5 个分区“转移”给其他成员。
- 显然,比较好的做法是维持当前 9 个成员消费分区的方案不变,然后将 5 个分区随机分配给这 9 个成员,这样能最大限度地减少 Rebalance 对剩余 Consumer 成员的冲击。
- 遗憾的是,目前 Kafka 并不是这样设计的。在默认情况下,每次 Rebalance 时,之前的分配方案都不会被保留。就拿刚刚这个例子来说,当 Rebalance 开始时Group 会打散这 50 个分区10 个成员 * 5 个分区),由当前存活的 9 个成员重新分配它们。
- 基于这个原因,社区于 0.11.0.0 版本推出了 StickyAssignor即有粘性的分区分配策略。所谓的有粘性是指每次 Rebalance 时,该策略会尽可能地保留之前的分配方案,尽量实现分区分配的最小变动。不过有些遗憾的是,这个策略目前还有一些 bug而且需要升级到 0.11.0.0 才能使用,因此在实际生产环境中用得还不是很多。
- 特别是针对 Rebalance 慢和影响 TPS 这两个弊端,社区有解决办法吗?
- 我可以很负责任地告诉你:“无解!”特别是 Rebalance 慢这个问题Kafka 社区对此无能为力。“本事大不如不摊上”,既然我们没办法解决 Rebalance 过程中的各种问题,干脆就避免 Rebalance 吧,特别是那些不必要的 Rebalance。
- 就我个人经验而言,在真实的业务场景中,很多 Rebalance 都是计划外的或者说是不必要的。我们应用的 TPS 大多是被这类 Rebalance 拖慢的,因此避免这类 Rebalance 就显得很有必要了。
- 要避免 Rebalance还是要从 Rebalance 发生的时机入手。我们在前面说过Rebalance 发生的时机有三个:
- 组成员数量发生变化
- 订阅主题数量发生变化
- 订阅主题的分区数发生变化
- 后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。接下来,我们主要说说因为组成员数量变化而引发的 Rebalance 该如何避免。
- **如果 Consumer Group 下的 Consumer 实例数量发生变化,就一定会引发 Rebalance**。这是 Rebalance 发生的最常见的原因。我碰到的 99% 的 Rebalance都是这个原因导致的。
- Consumer 实例增加的情况很好理解,**当我们启动一个配置有相同 group.id 值的 Consumer 程序时,实际上就向这个 Group 添加了一个新的 Consumer 实例**。
- 此时,**Coordinator 会接纳这个新实例,将其加入到组中,并重新分配分区**。通常来说,增加 Consumer 实例的操作都是计划内的,可能是出于增加 TPS 或提高伸缩性的需要。总之,它不属于我们要规避的那类“不必要 Rebalance”。
- 我们更在意的是 Group 下实例数减少这件事。如果你就是要停掉某些 Consumer 实例那自不必说关键是在某些情况下Consumer 实例会被 Coordinator 错误地认为“已停止”从而被“踢出”Group。如果是这个原因导致的 Rebalance我们就不能不管了。
- Coordinator 会在什么情况下认为某个 Consumer 实例已挂从而要退组呢?
- 当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。
- 如果某个 Consumer 实例不能及时地发送这些心跳请求Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。
- Consumer 端有个参数,叫 session.timeout.ms就是被用来表征此事的。该参数的默认值是 10 秒,即如果 Coordinator 在 10 秒之内没有收到 Group 下某 Consumer 实例的心跳,它就会认为这个 Consumer 实例已经挂了。
- 可以这么说,**session.timout.ms 决定了 Consumer 存活性的时间间隔**。
- 除了这个参数,**Consumer 还提供了一个允许你控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小Consumer 实例发送心跳请求的频率就越高**。
- 频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance因为目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。
- 除了以上两个参数Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。
它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求Coordinator 也会开启新一轮 Rebalance。
- 明确一下到底哪些 Rebalance 是“不必要的”。
- **第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的**。因此,你需要仔细地设置**session.timeout.ms 和 heartbeat.interval.ms**的值。我在这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。
- **设置 session.timeout.ms = 6s**。
- 将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer。毕竟我们还是希望能尽快揪出那些“尸位素餐”的 Consumer早日把它们踢出 Group。希望这份配置能够较好地帮助你规避第一类“不必要”的 Rebalance。
- **设置 heartbeat.interval.ms = 2s**。
- 要保证 Consumer 实例在被判定为“dead”之前能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
- **第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。**
- **max.poll.interval.ms**参数值的设置显得尤为关键。如果要避免非预期的 Rebalance你最好将该参数值设置得大一点比你的下游最大处理时间稍长一点。如果写的最长时间是 7 分钟,那么你可以将该参数设置为 8 分钟左右。
- 总之你要为你的业务处理逻辑留下充足的时间。这样Consumer 就不会因为处理这些消息的时间太长而引发 Rebalance 了。
- 如果你按照上面的推荐数值恰当地设置了这几个参数,却发现还是出现了 Rebalance那么我建议你去排查一下Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。
- 为什么特意说 GC那是因为在实际场景中我见过太多因为 GC 设置不合理导致程序频发 Full GC 而引发的非预期 Rebalance 了。
- 补充
- Consumer 消费时间过长为啥会导致rebalance是不能及时发心跳 导致coordinator认为该consumer挂了吗
- consumer主动关闭会主动向Coordinator发送LeaveGroup请求从而让Coordinator第一时间开启rebalance
- 如果同一个group 的不同consumer 设置的session.timeout.ms 的不一样怎么办协调者以最后一个consumer 为准吗?
- 取最大的
- 在一个session.timeout.ms周期内如果consumer重启了relalance是否就可以避免
- consumer重启了rebalance就开始了
- 2.3修复了之前StickyAssignor的一个重大bug
- 每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求表明它还存活着。”这个是后台自动触发的还是每次主动poll消息触发的啊
- 0.10.1之前是在调用poll方法时发送的0.10.1之后consumer使用单独的心跳线程来发送
### 2.10 Kafka中位移提交那些事儿
- Consumer 需要向 Kafka 汇报自己的位移数据这个汇报过程被称为提交位移Committing Offsets。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 **Consumer 需要为分配给它的每个分区提交各自的位移数据**
- 提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。
- 换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了。
- 因为位移提交非常灵活,你完全可以提交任何位移值,但由此产生的后果你也要一并承担。
- 假设你的 Consumer 消费了 10 条消息,你提交的位移值却是 20那么从理论上讲位移介于 1119 之间的消息是有可能丢失的;
- 相反地,如果你提交的位移值是 5那么位移介于 59 之间的消息就有可能被重复消费。
- 所以我想再强调一下位移提交的语义保障是由你来负责的Kafka 只会“无脑”地接受你提交的位移。
- 你对位移提交的管理直接影响了你的 Consumer 所能提供的消息语义保障。
- KafkaConsumer API提供了多种提交位移的方法。**从用户的角度来说,位移提交分为自动提交和手动提交****从 Consumer 端的角度来说,位移提交分为同步提交和异步提交**。
- 所谓自动提交,就是指 Kafka Consumer 在后台默默地为你提交位移作为用户的你完全不必操心这些事而手动提交则是指你要自己提交位移Kafka Consumer 压根不管。
- 开启自动提交位移的方法很简单。Consumer 端有个参数 enable.auto.commit把它设置为 true 或者压根不设置它就可以了。因为它的默认值就是 true即 Java Consumer 默认就是自动提交位移的。
- 如果启用了自动提交Consumer 端还有个参数就派上用场了auto.commit.interval.ms。它的默认值是 5 秒,表明 Kafka 每 5 秒会为你自动提交一次位移。
- 这段代码展示了设置自动提交位移的方法。
```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
```
- 和自动提交相反的,就是手动提交了。开启手动提交位移的方法就是设置 enable.auto.commit 为 false。但是仅仅设置它为 false 还不够,因为你只是告诉 Kafka Consumer 不要自动提交位移而已,你还需要调用相应的 API 手动提交位移。
- 最简单的 API 就是KafkaConsumer#commitSync()。该方法会提交 KafkaConsumer#poll() 返回的最新位移。从名字上来看,它是一个同步操作,即该方法会一直等待,直到位移被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。
```java
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
}
```
- 可见,调用 consumer.commitSync() 方法的时机,是在你处理完了 poll() 方法返回的所有消息之后。如果你莽撞地过早提交了位移,就可能会出现消费数据丢失的情况。
- 那么你可能会问,自动提交位移就不会出现消费数据丢失的情况了吗?它能恰到好处地把握时机进行位移提交吗?为了搞清楚这个问题,我们必须要深入地了解一下自动提交位移的顺序。
- 一旦设置了 enable.auto.commit 为 trueKafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。
- 从顺序上来说poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,**它可能会出现重复消费**。
- 在默认情况下Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。
虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。**这是自动提交机制的一个缺陷**。
- 反观手动提交位移,它的好处就在于更加灵活,你完全能够把控位移提交的时机和频率。但是,它也有一个缺陷,就是在调用 commitSync() 时Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。
当然,你可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。
- 鉴于这个问题Kafka 社区为手动提交位移提供了另一个 API 方法KafkaConsumer#commitAsync()。从名字上来看它就不是同步的,而是一个异步操作。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。
由于它是异步的Kafka 提供了回调函数callback供你实现提交之后的逻辑比如记录日志或处理异常等。下面这段代码展示了调用 commitAsync() 的方法:
```java
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception);
});
}
```
- commitAsync 是否能够替代 commitSync 呢答案是不能。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。
- 显然,如果是手动提交,我们需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果,原因有两个:
- 我们可以利用 commitSync 的自动重试来规避那些瞬时错误比如网络的瞬时抖动Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
- 我们不希望程序总处于阻塞状态,影响 TPS。
- 我们来看一下下面这段代码,它展示的是如何将两个 API 方法结合使用进行手动提交。
```java
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch (Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
```
- 这段代码同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性
- 所以,如果你需要自行编写代码开发一套 Kafka Consumer 应用,那么我推荐你使用上面的代码范例来实现手动的位移提交。
- 实际上Kafka Consumer API 还提供了一组更为方便的方法,可以帮助你实现更精细化的位移管理功能。刚刚我们聊到的所有位移提交,都是提交 poll 方法返回的所有消息的位移,比如 poll 方法一次返回了 500 条消息,当你处理完这 500 条消息之后,前面我们提到的各种方法会一次性地将这 500 条消息的位移一并处理。
**简单来说,就是直接提交最新一条消息的位移**。但如果我想更加细粒度化地提交位移,该怎么办呢?
- 设想这样一个场景:你的 poll 方法返回的不是 500 条消息,而是 5000 条。那么,你肯定不想把这 5000 条消息都处理完之后再提交位移,因为一旦中间出现差错,之前处理的全部都要重来一遍。这类似于我们数据库中的事务处理。很多时候,我们希望将一个大事务分割成若干个小事务分别提交,这能够有效减少错误恢复的时间。
- 在 Kafka 中也是相同的道理。对于一次要处理很多消息的 Consumer 而言,它会关心社区有没有方法允许它在消费的中间进行位移提交。比如前面这个 5000 条消息的例子,你可能希望每处理完 100 条消息就提交一次位移,这样能够避免大批量的消息重新消费。
- 庆幸的是Kafka Consumer API 为手动提交提供了这样的方法:
- commitSync(Map<TopicPartition, OffsetAndMetadata>) 和 commitAsync(Map<TopicPartition, OffsetAndMetadata>)。它们的参数是一个 Map 对象,键就是 TopicPartition即消费的分区而值是一个 OffsetAndMetadata 对象,保存的主要是位移数据。
- 就拿刚刚提过的那个例子来说,如何每处理 100 条消息就提交一次位移呢?在这里,我以 commitAsync 为例展示一段代码实际上commitSync 的调用方法和它是一模一样的。
```java
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
// ...
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回调处理逻辑是 null
count++;
}
}
```
- 简单解释一下这段代码。程序先是创建了一个 Map 对象,用于保存 Consumer 消费处理过程中要提交的分区位移,之后开始逐条处理消息,并构造要提交的位移值。还记得之前我说过要提交下一条消息的位移吗?这就是这里构造 OffsetAndMetadata 对象时,使用当前消息位移加 1 的原因。代码的最后部分是做位移的提交。
我在这里设置了一个计数器,每累计 100 条消息就统一提交一次位移。与调用无参的 commitAsync 不同,这里调用了带 Map 对象参数的 commitAsync 进行细粒度的位移提交。这样,这段代码就能够实现每处理 100 条消息就提交一次位移,不用再受 poll 方法返回的消息总数的限制了。
- 补充
- 对于手动同步和异步提交结合的场景如果poll出来的消息是500条而业务处理200条的时候业务抛异常了后续消息根本就没有被遍历过finally里手动同步提交的是201还是000还是501
- 如果调用没有参数的commit那么提交的是500
- poll下来的数据是有序的吗同一个partition中各个消息的相对顺序当然不同partition应该是不一定的
- 是的
- 老师好consumer的api在读取的时候能指定从某个partition的某个offset开始读取吗
- 可以控制使用KafkaConsumer.seek可以精确控制你要开始消费的位移
- 自动提交就一定能够保证不丢消息吗?
- 不能绝对保证
### 2.11 CommitFailedException异常怎么处理
- 所谓 CommitFailedException顾名思义就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。
- 如果异常是可恢复的瞬时错误,提交位移的 API 自己就能规避它们了,因为很多提交位移的 API 方法是支持自动错误重试的比如我们在上一期中提到的commitSync 方法。
- 每次和 CommitFailedException 一起出现的,还有一段非常著名的注释。
- 第一,我想不出在近 50 万行的 Kafka 源代码中,还有哪个异常类能有这种待遇,可以享有这么大段的注释,来阐述其异常的含义;
- 第二,纵然有这么长的文字解释,却依然有很多人对该异常想表达的含义感到困惑。
- 现在,我们一起领略下这段文字的风采,看看社区对这个异常的最新解释:
> Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
> This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms,
> which typically implies that the poll loop is spending too much time message processing.
> **You can address this either by increasing max.poll.interval.**
> **ms or by reducing the maximum size of batches returned in poll() with max.poll.records.**
- 这段话前半部分的意思是,本次提交位移失败了,原因是消费者组已经开启了 Rebalance 过程,并且将要提交位移的分区分配给了另一个消费者实例。
- 出现这个情况的原因是,你的消费者实例连续两次调用 poll 方法的时间间隔超过了期望的 max.poll.interval.ms 参数值。这通常表明,你的消费者实例花费了太长的时间进行消息处理,耽误了调用 poll 方法。
- 在后半部分,社区给出了两个相应的解决办法(即橙色字部分):
- 增加期望的时间间隔 max.poll.interval.ms 参数值。
- 减少 poll 方法一次性返回的消息数量,即减少 max.poll.records 参数值。
- 在详细讨论这段文字之前,我还想提一句,实际上这段文字总共有 3 个版本,除了上面的这个最新版本,还有 2 个版本,它们分别是:
> Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
> **This means that the time between subsequent calls to poll() was longer than the configured session.timeout.**
> **ms**, which typically implies that the poll loop is spending too much time message processing.
> You can address this either by **increasing the session timeout** or by reducing the maximum size of batches returned in poll() with max.poll.records.
> Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
> **This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.**
> **ms**, which typically implies that the poll loop is spending too much time message processing.
> You can address this either by **increasing the session timeout** or by reducing the maximum size of batches returned in poll() with max.poll.records.
- 其实不论是哪段文字,它们都表征位移提交出现了异常。
- 下面我们就来讨论下该异常是什么时候被抛出的。
- 从源代码方面来说CommitFailedException 异常通常发生在手动提交位移时,即用户显式调用 KafkaConsumer.commitSync() 方法时。
- 从使用场景来说,有两种典型的场景可能遭遇该异常。
- 场景一
- 我们先说说最常见的场景。当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时Kafka Consumer 端会抛出 CommitFailedException 异常。
这是该异常最“正宗”的登场方式。你只需要写一个 Consumer 程序,使用 KafkaConsumer.subscribe 方法随意订阅一个主题,之后设置 Consumer 端参数 max.poll.interval.ms=5 秒,
最后在循环调用 KafkaConsumer.poll 方法之间,插入 Thread.sleep(6000) 和手动提交位移,就可以成功复现这个异常了。在这里,我展示一下主要的代码逻辑。
```java
Properties props = new Properties();
props.put("max.poll.interval.ms", 5000);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
// 使用 Thread.sleep 模拟真实的消息处理逻辑
Thread.sleep(6000L);
consumer.commitSync();
}
```
- 如果要防止这种场景下抛出异常,你需要简化你的消息处理逻辑。具体来说有 4 种方法。
- **缩短单条消息处理的时间**。比如,之前下游系统消费一条消息的时间是 100 毫秒,优化之后成功地下降到 50 毫秒,那么此时 Consumer 端的 TPS 就提升了一倍。
- **增加 Consumer 端允许下游系统消费一批消息的最大时长**。这取决于 Consumer 端参数 max.poll.interval.ms 的值。在最新版的 Kafka 中,该参数的默认值是 5 分钟。
如果你的消费逻辑不能简化那么提高该参数值是一个不错的办法。值得一提的是Kafka 0.10.1.0 之前的版本是没有这个参数的,因此如果你依然在使用 0.10.1.0 之前的客户端 API
那么你需要增加 session.timeout.ms 参数的值。不幸的是session.timeout.ms 参数还有其他的含义,因此增加该参数的值可能会有其他方面的“不良影响”,
这也是社区在 0.10.1.0 版本引入 max.poll.interval.ms 参数,将这部分含义从 session.timeout.ms 中剥离出来的原因之一。
- **减少下游系统一次性消费的消息总数**。这取决于 Consumer 端参数 max.poll.records 的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,
最多返回 500 条消息。可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。如果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常最简单的手段。
- **下游系统使用多线程来加速消费**。这应该算是“最高级”同时也是最难实现的解决办法了。具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。
之前你使用 Kafka Consumer 消费数据更多是单线程的,所以当消费速度无法匹及 Kafka Consumer 消息返回的速度时,它就会抛出 CommitFailedException 异常。
如果是多线程,你就可以灵活地控制线程数量,随时调整消费承载能力,再配以目前多核的硬件条件,该方法可谓是防止 CommitFailedException 最高档的解决之道。事实上,
很多主流的大数据流处理框架使用的都是这个方法,比如 Apache Flink 在集成 Kafka 时,就是创建了多个 KafkaConsumerThread 线程,自行处理多线程间的数据消费。
不过,凡事有利就有弊,这个方法实现起来并不容易,特别是在多个线程间如何处理位移提交这个问题上,更是极容易出错。在专栏后面的内容中,我将着重和你讨论一下多线程消费的实现方案。
- 综合以上这 4 个处理方法,**我个人推荐你首先尝试采用方法 1 来预防此异常的发生**。优化下游系统的消费逻辑是百利而无一害的法子,不像方法 2、3 那样涉及到 Kafka Consumer 端 TPS 与消费延时Latency的权衡。如果方法 1 实现起来有难度,那么你可以按照下面的法则来实践方法 2、3。
- 首先,你需要弄清楚你的下游系统消费每条消息的平均延时是多少。比如你的消费逻辑是从 Kafka 获取到消息后写入到下游的 MongoDB 中,假设访问 MongoDB 的平均延时不超过 2 秒,那么你可以认为消息处理需要花费 2 秒的时间。
如果按照 max.poll.records 等于 500 来计算,一批消息的总消费时长大约是 1000 秒,因此你的 Consumer 端的 max.poll.interval.ms 参数值就不能低于 1000 秒。
如果你使用默认配置,那默认值 5 分钟显然是不够的,你将有很大概率遭遇 CommitFailedException 异常。将 max.poll.interval.ms 增加到 1000 秒以上的做法就属于上面的第 2 种方法。
- 除了调整 max.poll.interval.ms 之外,你还可以选择调整 max.poll.records 值,减少每次 poll 方法返回的消息数。
还拿刚才的例子来说,你可以设置 max.poll.records 值为 150甚至更少这样每批消息的总消费时长不会超过 300 秒150*2=300即 max.poll.interval.ms 的默认值 5 分钟。
这种减少 max.poll.records 值的做法就属于上面提到的方法 3。
- 场景二
- 从理论上讲,关于该异常你了解到这个程度,已经足以帮助你应对应用开发过程中由该异常带来的“坑”了 。但其实,该异常还有一个不太为人所知的出现场景。了解这个冷门场景,可以帮助你拓宽 Kafka Consumer 的知识面,也能提前预防一些古怪的问题。下面我们就来说说这个场景。
- 之前我们花了很多时间学习 Kafka 的消费者,不过大都集中在消费者组上,即所谓的 Consumer Group。其实Kafka Java Consumer 端还提供了一个名为 **Standalone Consumer** 的独立消费者。它没有消费者组的概念,每个消费者实例都是独立工作的,彼此之间毫无联系。
- 不过,你需要注意的是,独立消费者的位移提交机制和消费者组是一样的,因此独立消费者的位移提交也必须遵守之前说的那些规定,比如独立消费者也要指定 group.id 参数才能提交位移。你可能会觉得奇怪,既然是独立消费者,为什么还要指定 group.id 呢?
没办法,谁让社区就是这么设计的呢?总之,消费者组和独立消费者在使用之前都要指定 group.id。
- 现在问题来了,如果你的应用中同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序那么当独立消费者程序手动提交位移时Kafka 就会立即抛出 CommitFailedException 异常,因为 Kafka 无法识别这个具有相同 group.id 的消费者实例,于是就向它返回一个错误,表明它不是消费者组内合法的成员。
- 虽然说这个场景很冷门,但也并非完全不会遇到。在一个大型公司中,特别是那些将 Kafka 作为全公司级消息引擎系统的公司中,每个部门或团队都可能有自己的消费者应用,谁能保证各自的 Consumer 程序配置的 group.id 没有重复呢?一旦出现不凑巧的重复,发生了上面提到的这种场景,你使用之前提到的哪种方法都不能规避该异常。
- 令人沮丧的是,无论是刚才哪个版本的异常说明,都完全没有提及这个场景,因此,如果是这个原因引发的 CommitFailedException 异常,前面的 4 种方法全部都是无效的。
- 更为尴尬的是,无论是社区官网,还是网上的文章,都没有提到过这种使用场景。我个人认为,这应该算是 Kafka 的一个 bug。比起返回 CommitFailedException 异常只是表明提交位移失败,更好的做法应该是,在 Consumer 端应用程序的某个地方,能够以日志或其他方式友善地提示你错误的原因,这样你才能正确处理甚至是预防该异常。
- 补充
- 1、请问Standalone Consumer 的独立消费者一般什么情况会用到 2、Standalone Consumer 的独立消费者 使用跟普通消费者组有什么区别的。
- 1. 很多流处理框架的Kafka connector都没有使用consumer group而是直接使用standalone consumer因为group机制不好把控 2. standalone consumer没有rebalance也没有group提供的负载均衡你需要自己实现。其他方面比如位移提交和group没有太大的不同
- 0.10.1.0之后的session.timeout.ms还有什么作用呢 standalone consumer和group consumer在配置上如何区分?
- 用于侦测会话超时。standalone consumer和group的区分体现在API上
### 2.12 多线程开发消费者实例
- Kafka Java Consumer 设计原理
- 谈到 Java Consumer API最重要的当属它的入口类 KafkaConsumer 了。我们说 KafkaConsumer 是单线程的设计,严格来说这是不准确的。因为,从 Kafka 0.10.1.0 版本开始KafkaConsumer 就变为了双线程的设计,**即用户主线程和心跳线程**。
- 所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程而新引入的心跳线程Heartbeat Thread只负责定期给对应的 Broker 机器发送心跳请求以标识消费者应用的存活性liveness。引入这个心跳线程还有一个目的那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。
- 不过,虽然有心跳线程,**但实际的消息获取逻辑依然是在用户主线程中完成的**。因此,在消费消息的这个层面上,我们依然可以**安全地认为 KafkaConsumer 是单线程的设计**。
- 其实,在社区推出 Java Consumer API 之前Kafka 中存在着一组统称为 Scala Consumer 的 API。这组 API或者说这个 Consumer也被称为老版本 Consumer**目前在新版的 Kafka 代码中已经被完全移除了**。
- 多线程方案
- 我们来具体分析一下 KafkaConsumer 这个类的使用方法,以及如何推演出对应的多线程方案。
- 我们要明确的是,**KafkaConsumer 类不是线程安全的 (thread-safe)**。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全。简单来说,就是你不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。
- 当然了这也不是绝对的。KafkaConsumer 中有个方法是例外的它就是wakeup()你可以在其他线程中安全地调用KafkaConsumer.wakeup()来唤醒 Consumer。
- 鉴于 KafkaConsumer 不是线程安全的事实,我们能够制定两套多线程方案。
- **消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程**。如下图所示:
- ![KafkaConsumer多线程方案1](pic/KafkaConsumer多线程方案1.png)
- **消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑**。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,**处理消息则交由特定的线程池来做**,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:
- ![KafkaConsumer多线程方案2](pic/KafkaConsumer多线程方案2.png)
- 总体来说,这两种方案都会创建多个线程,这些线程都会参与到消息的消费过程中,但各自的思路是不一样的。
- 比如一个完整的消费者应用程序要做的事情是 1、2、3、4、5那么方案 1 的思路是粗粒度化的工作划分,也就是说方案 1 会创建多个线程,每个线程完整地执行 1、2、3、4、5以实现并行处理的目标它不会进一步分割具体的子任务
而方案 2 则更细粒度化,它会将 1、2 分割出来,用单线程(也可以是多线程)来做,对于 3、4、5则用另外的多个线程来做。
- 这两种方案的优缺点,我们先来看看下面这张表格。
- ![KafkaConsumer多线程方案3](pic/KafkaConsumer多线程方案3.png)
- 我们先看方案 1它的优势有 3 点。
- 实现起来简单,因为它比较符合目前我们使用 Consumer API 的习惯。我们在写代码的时候,使用多个线程并在每个线程中创建专属的 KafkaConsumer 实例就可以了。
- 多个线程之间彼此没有任何交互,省去了很多保障线程安全方面的开销。
- 由于每个线程使用专属的 KafkaConsumer 实例来执行消息获取和消息处理逻辑因此Kafka 主题中的每个分区都能保证只被一个线程处理,这样就很容易实现分区内的消息消费顺序。这对在乎事件先后顺序的应用场景来说,是非常重要的优势。
- 说完了方案 1 的优势,我们来看看这个方案的不足之处。
- 每个线程都维护自己的 KafkaConsumer 实例必然会占用更多的系统资源比如内存、TCP 连接等。在资源紧张的系统环境中,方案 1 的这个劣势会表现得更加明显。
- 这个方案能使用的线程数受限于 Consumer 订阅主题的总分区数。我们知道,在一个消费者组中,每个订阅分区都只能被组内的一个消费者实例所消费。假设一个消费者组订阅了 100 个分区,那么方案 1 最多只能扩展到 100 个线程,多余的线程无法分配到任何分区,只会白白消耗系统资源。
当然了,这种扩展性方面的局限可以被多机架构所缓解。除了在一台机器上启用 100 个线程消费数据,我们也可以选择在 100 台机器上分别创建 1 个线程,效果是一样的。因此,如果你的机器资源很丰富,这个劣势就不足为虑了。
- 每个线程完整地执行消息获取和消息处理逻辑。一旦消息处理逻辑很重,造成消息处理速度慢,就很容易出现不必要的 Rebalance从而引发整个消费者组的消费停滞。这个劣势你一定要注意。我们之前讨论过如何避免 Rebalance如果你不记得的话可以回到专栏第 17 讲复习一下。
- 下面我们来说说方案 2。
- 与方案 1 的粗粒度不同,方案 2 将任务切分成了消息获取和消息处理两个部分,分别由不同的线程处理它们。比起方案 1方案 2 的最大优势就在于它的高伸缩性,就是说我们可以独立地调节消息获取的线程数,以及消息处理的线程数,而不必考虑两者之间是否相互影响。
如果你的消费获取速度慢,那么增加消费获取的线程数即可;如果是消息的处理速度慢,那么增加 Worker 线程池线程数即可。
- 它的缺陷
- 它的实现难度要比方案 1 大得多,毕竟它有两组线程,你需要分别管理它们。
- 因为该方案将消息获取和消息处理分开了,也就是说获取某条消息的线程不是处理该消息的线程,因此无法保证分区内的消费顺序。举个例子,比如在某个分区中,消息 1 在消息 2 之前被保存,那么 Consumer 获取消息的顺序必然是消息 1 在前,消息 2 在后,但是,
后面的 Worker 线程却有可能先处理消息 2再处理消息 1这就破坏了消息在分区中的顺序。还是那句话如果你在意 Kafka 中消息的先后顺序,方案 2 的这个劣势是致命的。
- 方案 2 引入了多组线程,使得整个消息消费链路被拉长,最终导致正确位移提交会变得异常困难,结果就是可能会出现消息的重复消费。如果你在意这一点,那么我不推荐你使用方案 2。
- 实现代码示例
- 分享一段方案 1 的主体代码:
```java
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records =
consumer.poll(Duration.ofMillis(10000));
// 执行消息处理逻辑
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
```
- 这段代码创建了一个 Runnable 类,表示执行消费获取和消费处理的逻辑。每个 KafkaConsumerRunner 类都会创建一个专属的 KafkaConsumer 实例。在实际应用中,你可以创建多个 KafkaConsumerRunner 实例,并依次执行启动它们,以实现方案 1 的多线程架构。
- 对于方案 2 来说,核心的代码是这样的:
```java
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...
private int workerNum = ...;
executors = new ThreadPoolExecutor(
workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
...
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (final ConsumerRecord record : records) {
// 最重要的部分
executors.submit(new Worker(record));
}
}
..
```
- 当 Consumer 的 poll 方法返回消息后,由专门的线程池来负责处理具体的消息。调用 poll 方法的主线程不负责消息处理逻辑,这样就实现了方案 2 的多线程架构。
- 补充
- Kafka重启时间比较长每次重启一台差不多四五十分钟日志保存12个小时每台数据量差不多几个T想请教一下老师有什么可以优化的参数吗
- 有可能是要加载的日志段数据太多导致的可以增加num.recovery.threads.per.data.dir的值
- 方案2的代码consumer实例也是单线程的
- 嗯,如果唯一用来拉取消息不执行小处理逻辑,那么单线程已然很高效了。
### 2.13 Java 消费者是如何管理TCP连接的?
## 4. Consumer
## 5. Stream
## 6. Connect
## 7. Kafka集群部署与开发
## 8. Kafka集群监控、安全与最佳实践
## 9. Kafka面试点梳理