You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

794 lines
81 KiB
Markdown

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# Kafka
## 0. 目录
- Kafka Kafka基本概念及配置
Kafka 客户端实践及原理剖析
Kafka 内核及源码
Kafka 管理和监控
Kafka 流处理
## 1. Kafka基本概念及配置
- 官方QuickStart: https://kafka.apache.org/quickstart
- 官方文档: https://kafka.apache.org/documentation/
- 最新版本 3.2.0 【截止2022-7-20】
- 学习版本 kafka_2.11-2.4.0【前面是scala后面是kafka】 zookeeper-3.5.7【截止2022-7-20】
### 1.1 概念
- 概述
- Kafka 是分布式的流处理平台
- Kafka 是基于Zookeeper的分布式消息系统
- Kafka具有高吞吐率、高性能、实时、高可靠等特点
- Kafka 的组成概念
- Topic: 一个虚拟的概念, 由1到多个Partitions组成
- Partition: 实际消息存储单位
- Producer: 消息生产者
- Consumer: 消息消费者
-
### 1.2 安装Zookeeper(快捷)
```shell
# 修改配置文件
cp zoo_sample.cfg zoo.cfg
# 需要修改参数
dataDir=指定一个存储配置文件的目录
# 启动
./zkServer.sh start
# 连接测试
./zkCli.sh
```
### 1.3 安装Kafka
```shell
# 修改基本的配置文件
vim server.properties
# 需要修改的配置项
listeners=PLAINTEXT://[你的ip地址]:9092
advertised.listeners=PLAINTEXT://[你的ip地址]:9092
log.dirs=你的kafka的日志目录
zookeeper.connect=[你的ip地址]:2181
```
### 1.4 Kafka 的线上方案
- 操作系统
- 在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的 快速数据传输特性。
- Windows 平台上部署 Kafka 只适合于个人测试或用于功能验证, 千万不要应用于生产环境。
- 磁盘
- **普通的机械磁盘** 即可, Kafka 大量使用磁盘不假,可它使用的方式多是顺序读写操作,一定程度上规避了机械磁盘 最大的劣势,即随机读写操作慢。
- 追求性价比的公司可以**不搭建 RAID**,使用**普通磁盘组成存储空间即可**。 [Kafka 1.1开始正式支持JBOD了。再说Kafka本身在软件层面也提供了冗余的机制来对 抗磁盘损坏。]
- 磁盘容量
- 在规划磁盘容量时你需要考虑下面这几个元素:
- 新增消息数
- 消息留存时间
- 平均消息大小
- 备份数
- 是否启用压缩
- 例子:
- 假设你所在公司有个业务每天需要向 Kafka 集群发送 **1 亿条消息**,每条消息**保存两份**以防止数据丢失,另外消息默认**保存两周** 时间。现在假设消息的**平均大小是 1KB**?
- 每天 1 亿条 1KB 大小的消息,保存两份且留存两周的时间,那么总的空 间大小就等于 1 亿 * 1KB * 2 / 1000 / 1000 = 200GB。一般情况下 Kafka 集群除了消息 数据还有其他类型的数据,
- 比如索引数据等,故我们再为这些数据预留出 10% 的磁盘空 间,因此总的存储容量就是 220GB。既然要保存两周那么整体容量即为 220GB * 14 大约 3TB 左右。Kafka 支持数据的压缩,
假设压缩比是 0.75,那么最后你需要规划的存储 空间就是 0.75 * 3 = 2.25TB。
- 带宽
- 对于 Kafka 这种通过网络大量进行数据传输的框架而言,**带宽特别容易成为瓶颈**。事实 上,在我接触的真实案例当中,带宽资源不足导致 Kafka 出现性能问题的比例至少占 60% 以上。
如果你的环境中还涉及跨机房传输,那么情况可能就更糟了。
- 带宽也主要有 两种:1Gbps 的千兆网络和 10Gbps 的万兆网络,特别是千兆网络应该是一般公司网络的标准配置了。
- 与其说是带宽资源的规划,其实真正要规划的是所需的 Kafka 服务器的数量。
- 假设你公司 的机房环境是千兆网络,即 1Gbps现在你有个业务其业务目标或 SLA 是在 1 小时内处 理 1TB 的业务数据。那么问题来了,你到底需要多少台 Kafka 服务器来完成这个业务呢?
- 让我们来计算一下,由于带宽是 1Gbps即每秒处理 1Gb 的数据,假设每台 Kafka 服务 器都是安装在专属的机器上,也就是说每台 Kafka 机器上没有混布其他服务,毕竟真实环境中不建议这么做。
通常情况下你只能假设 Kafka 会用到 70% 的带宽资源,因为总要为其 他应用或进程留一些资源。根据实际使用经验,超过 70% 的阈值就有网络丢包的可能性了,故 70% 的设定是一个比 较合理的值,
也就是说单台 Kafka 服务器最多也就能使用大约 700Mb 的带宽资源。稍等,这只是它能使用的最大带宽资源,你不能让 Kafka 服务器常规性使用这么多资源, 故通常要再额外预留出 2/3 的资源,
即单台服务器使用带宽 700Mb / 3 ≈ 240Mbps。需 要提示的是,这里的 2/3 其实是相当保守的,你可以结合你自己机器的使用情况酌情减少 此值。 好了,有了 240Mbps
我们就可以计算 1 小时内处理 1TB 数据所需的服务器数量了。根 据这个目标,我们**每秒需要处理 2336Mb [1024*1024/3600*8] 的数据,除以 240约等于 10 台服务器**。
如果 消**息还需要额外复制两份,那么总的服务器台数还要乘以 3即 30 台**。
- Kafka 性能测试脚本
- kafka-producer-perf-test 脚本还不错kafka-consumer-perf-test有点难用
- 混部
- 一个机器上 部署一个zookeeper和一个kafka, 如果配置好**ZooKeeper事务日志**(比如设置好autopurge.purgeInterval及定期删除 snapshot等)它对IO的需求不大混布也是可以的。
---
- 最最最重要的集群参数配置
- Kafka 服务器端的配置,其中既有 Broker 端参数,也有主题(Topic)级别的参数、JVM 端参数和操作系统级别的参数
- Broker 端参数
- 首先 Broker 是需要配置存储信息的,即 Broker 使用哪些 磁盘。那么针对存储信息的重要参数有以下这么几个:
- **log.dirs**: 这是非常重要的参数,指定了 Broker 需要 使用的若干个文件目录路径。要知道这个参数是没有默认 值的,这说明什么?这说明它**必须由你亲自指定**。
- log.dir: 注意这是 dir结尾没有 s说明它只能表示 单个路径,它是**补充上一个参数用的**。
- 这两个参数应该怎么设置呢?很简单,你只要设置 log.dirs即第一个参数就好了不要设置log.dir。而且更重要的是在线上生产环境中一定要为log.dirs配置 多个路径,
具体格式是一个 CSV 格式,也就是用逗号分隔 的多个路径,比 如/home/kafka1,/home/kafka2,/home/kafka3这 样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。
- 这样做有两个好处:
- 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
- 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新 引入的强大功能。要知道在以前,只要 Kafka Broker 使 用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。 但是自 1.1 开始,这种情况被修正了,
坏掉的磁盘上的数 据会自动地转移到其他正常的磁盘上,而且 Broker 还能 正常工作。这个改进正是我们舍弃 RAID 方案的基 础:没有这种 Failover 的话,我们只能依靠 RAID 来提 供保障。
- 与 ZooKeeper 相关的设置
- 首先 ZooKeeper 是 做什么的呢?它是一个分布式协调框架,负责协调管理并保 存 Kafka 集群的所有元数据信息比如集群都有哪些Broker 在运行、创建了哪些 Topic每个 Topic 都有多少 分区以及这些分区的 Leader 副本都在哪些机器上等信息。
- Kafka 与 ZooKeeper 相关的最重要的参数当属 **zookeeper.connect**
- 这也是一个 CSV 格式的参数,比 如我可以指定它的值为 **zk1:2181,zk2:2181,zk3:2181**。2181 是 ZooKeeper 的默认端口。
- 如果我让**多个 Kafka 集群使用同一套 ZooKeeper 集群**,那么这个参数应该怎么设置呢?这时候 chroot 就派上用场了。这个 chroot 是 ZooKeeper 的概 念,类似于别名。
- 如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2那么两套集群的zookeeper.connect参数可以 这样指定:**zk1:2181,zk2:2181,zk3:2181/kafka1和 zk1:2181,zk2:2181,zk3:2181/kafka2**。
- **切记 chroot 只需要写一次,而且是加到最后的**。我经常碰到有人这样指 定: zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3这样的格式是不对的。
- 第三组参数是与 Broker 连接相关的,即客户端程序或其他 Broker 如何与该 Broker 进行通信的设置。有以下三个参 数:
- **listeners**: 学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。
- advertised.listeners: 和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的, 就是说**这组监听器是 Broker 用于对外发布的**。
- [内网环境访问Kafka不需要配置这个参数常见的玩法是:你的Kafka Broker机器上配置了双网卡一 块网卡用于内网访问(即我们常说的内网IP);另一个块用 于外网访问。那么你可以配置listeners为内网IP advertised.listeners为外网IP。]
- ~~host.name/port~~: 列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。
- 监听器的概念,从构成上来说,它是若干个逗 号分隔的三元组,**每个三元组的格式为<协议名称,主机 名,端口号>**。这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密 传输等;
也可能是你自己定义的协议名字,比如 CONTROLLER: //localhost:9092。
- 一旦你自己定义了协议名称,你必须还要指定 listener.security.protocol.map参数告诉这个协议底层使用了哪种安全协议比如指定 listener.security.protocol.map=CONTROLLER:P LAINTEXT表示CONTROLLER这个自定义协议底层使用明文 不加密传输数据。
- 经常有人会问主机 名这个设置中我到底使用 IP 地址还是主机名。这里我给出 统一的建议:**最好全部使用主机名**,即 **Broker 端和 Client 端应用配置中全部填写主机名**。 Broker 源代码中也使用的 是主机名,如果你在某些地方使用了 IP 地址进行连接,可能会发生无法连接的问题。
- 第四组参数是关于 Topic 管理的。我来讲讲下面这三个参数:
- auto.create.topics.enable:是否允许自动创建 Topic。
- [建议最好设置成 **false**,即不允许自动创建 Topic。每个部门被分配的 Topic 应该由运维严格把控,决不能允许自行创建任何 Topic。]
- unclean.leader.election.enable:是否允许 Unclean Leader 选举。
- [每个分区都有多个副本来提供高可用。 在这些副本中只能有一个副本对外提供服务,即所谓的 Leader 副本。建议你还是显式地把它设置成 **false** 吧]
- auto.leader.rebalance.enable:是否允许定期进 行 Leader 选举。
- [对生产环境影响非常大,设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满 足一定的条件才会发生。严格来说它与上一个参数中Leader 选举的最大不同在于,
它不是选 Leader而是换 Leader!比如 Leader A 一直表现得很好,但若 true则有可能 一段时间后 Leader A 就要被强行卸任换成 Leader B。建议你在生产环境中把这 个参数设置成 **false**。]
- 最后一组参数是数据留存方面的,即:
- log.retention.{hour|minutes|ms}:这是个“三 兄弟”,都是控制一条消息数据被保存多长时间。从优先 级上来说 ms 设置最高、minutes 次之、hour 最低。
- [虽然 ms 设置有最高的优先级,但是 通常情况下我们还是设置 hour 级别的多一些,比如 log.retention.hour=168表示默认保存 7 天的数据,自动删除 7 天前的数据。很多公司把 Kafka 当做存储来使 用,那么这个值就要相应地调大。]
- log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。
- [这个值默认是 -1 表明你想在这台 Broker 上保存多少数据都可以,至少在容 量方面 Broker 绝对为你开绿灯,不会做任何阻拦。这个参 数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集 群:设想你要做一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间]
- message.max.bytes:控制 Broker 能够接收的最大消息大小。
- [不能使用默认值的参数,默认的 1000012 太少了,还不到 1MB。实际场景中突 破 1MB 的消息都是屡见不鲜的,因此在线上环境中设置一 个比较大的值还是比较保险的做法。毕竟它只是一个标尺而 已,仅仅衡量 Broker 能够处理的最大消息大小,即使设置 大一点也不会耗费什么磁盘空间的。]
- 补充参数
- gg.handler.kafkahandler.Mode = tx gg.handler.kafkahandler.Mode = op 这两个的差别。遇到时 dml 数据会丢失的情况。用的是 op 。
- 当设置成op单个数据库表的变更(插入、更新、删除) 会被当成一条Kafka消息发送;当设置成tx时数据库事务 所做的所有变更统一被封装进一条Kafka消息并在事务提 交后被发送。后者有事务性的保障,至少有原子性方面的保证,不会丢失部分 CDC 数据。
- Topic 级别参数
- Topic 级别参数会**覆盖全局 Broker 参数的值**,而每个 Topic 都能设置自己的参数值, 这就是所谓的 Topic 级别参数。
- **retention.ms**: 规定了该 Topic 消息被保存的时长。 默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦 设置了这个值,它会覆盖掉 Broker 端的全局参数值。
- **retention.bytes**:规定了要为该 Topic 预留多大的磁 盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1表示可以 无限使用磁盘空间。
- Topic 级别 参数的设置就是这种情况,我们有两种方式可以设置:
- 创建 Topic 时进行设置
- 修改 Topic 时设置
- 如何在创建 Topic 时设置这些参数
- 你的部门需要将交易数据发送到 Kafka 进行处理,需要保 存最近半年的交易数据,同时这些数据很大,通常都有几 MB但一般不会超过 5MB。现在让我们用以下命令来创建 Topic:
- bin/kafka-topics.sh--bootstrap-serverlocalhost:9092--create--topictransaction--partitions1--replication-factor1--configretention.ms=15552000000--configmax.message.bytes=5242880
- 请注意结尾处的--config设置我们就是在 config 后面指定了想要设置的 Topic 级别参数。
- 使用另一个自带的命令kafka-configs来修改 Topic 级别参数。假设我们现在要发送最大值是 10MB 的消息,该如何修改呢?命令如下:
- bin/kafka-configs.sh--zookeeperlocalhost:2181--entity-typetopics--entity-nametransaction--alter--add-configmax.message.bytes=10485760
- [你最好始终坚持使用第二种方式来设置并且在未来Kafka 社区很有可能统一使用kafka-configs脚本来调整 Topic 级别参数。]
- JVM 参数
- Kafka 服务器端代码是用 Scala 语言编写的,但终归还是编译成 Class 文件在 JVM 上运行,因此 JVM 参数设置对于 Kafka 集群的重要性不言而喻。
- 有条件的话至少使用 Java 8 吧, [Kafka 2.0已经不支持Java 7了2.1版本开始初步支持Java 11但不建议生产环境用11所以还是使用Java 8吧。]
- 堆大小这个参数至关重要。无脑给出一个通用的建议:将你的 JVM 堆大小设置成 **6GB** 吧,这是目前业界比较公认的一个合理值。我见过很多人就是使用默认的 Heap Size 来跑 Kafka说实话默认的 1GB 有点小,
毕竟 Kafka Broker 在与客户端进行交互时会在 JVM 堆上创建大量的 ByteBuffer 实例Heap Size 不能太小。
- [没有通用的标准只有一个最佳实践值6GB。最好还是监控一下实时的堆大小特别是**GC之后的live data大小通常将heapsize设置成其1.5~2倍就足以了**]
- JVM 端配置的另一个重要参数就是垃圾回收器的设置,也就是平时常说的 GC 设置。如果你依然在使用 Java 7那么可以根据以下法则选择合适的垃圾回收器
- 如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。
- 否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。
- 当然了,如果你已经在使用 Java 8 了[G1是jdk9中默认的jdk8还是需要显式指定的]。在没有任何调优的情况下G1 表现得要比 CMS 出色,主要体现在更少的 Full GC需要调整的参数更少等所以使用 G1 就好了。
- Java8默认的新生代垃圾回收器是UseParallelGC可以用-XX:+PrintCommandLineFlags -version查看如果显示指定 -XX:+UseCurrentMarkSweepGC 的话,会默认开启 -XX:+UseParallelGC
- 确定好了要设置的 JVM 参数,我们该如何为 Kafka 进行设置呢?
- 只需要设置下面这两个环境变量即可:
- KAFKA_HEAP_OPTS指定堆大小。
- KAFKA_JVM_PERFORMANCE_OPTS指定 GC 参数。
- 比如你可以这样启动 Kafka Broker即在启动 Kafka Broker 之前,先设置上这两个环境变量:
- $> **export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g**
- $> **export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true**
- $> **bin/kafka-server-start.sh config/server.properties**
- 操作系统参数
- Kafka 并不需要设置太多的 OS 参数,但有些因素最好还是关注一下,比如下面这几个:
- 文件描述符限制
- 文件系统类型
- Swappiness
- 提交时间
- 首先是 **ulimit -n**
- 觉得任何一个 Java 项目最好都调整下这个值。实际上,文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如 **ulimit -n 1000000**
- 但不设置的话后果很严重比如你会经常看到“Too many open files”的错误。
- 其次是文件系统类型的选择。
- 这里所说的文件系统指的是如 ext3、ext4 或 XFS 这样的日志型文件系统。根据官网的测试报告,**XFS** 的性能要强于 ext4所以生产环境最好还是使用 XFS。对了最近有个 Kafka 使用 **ZFS** 的数据报告,貌似性能更加强劲,有条件的话不妨一试。
- 第三是 swap 的调优
- 网上很多文章都提到设置其为 0将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。我个人反倒觉得还是不要设置成 0 比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成 0当物理内存耗尽时操作系统会触发 OOM killer 这个组件,
它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,
我**个人建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1**。
### 1.5 kafka的基本操作
```shell
# 1、启动Kafka - todo 这个命令会让控制台有日志生成不太方便
bin/kafka-server-start.sh config/server.properties &
# 2、停止Kafka
bin/kafka-server-stop.sh
# 3、创建Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic jiangzh-topic
# 4、查看已经创建的Topic信息
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 5、发送消息
bin/kafka-console-producer.sh --broker-list 192.168.220.128:9092 --topic jiangzh-topic
# 6、接收消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.220.128:9092 --topic jiangzh-topic --from-beginning
# {"orderId":"002","price":"80"}
```
```shell
# 【旧版】
kafka-topics.sh --zookeeper 172.16.26.183:2181 --list
# 【新版】
kafka-topics.sh --bootstrap-server 172.16.26.183:9092 --list
# zookeeper is not a recognized option主要原因是 Kafka 版本过高,命令不存在)
# 创建topic主题
kafka-topics.sh --bootstrap-server 172.16.26.183:9092 --create --topic topic1 --partitions 1 --replication-factor 3
# --create 命令后 --topic 为创建topic 并指定 topic name
# --partitions 为指定分区数量
# --replication-factor 为指定副本集数量
# 向kafka集群发送数据
# 【无key型消息】
kafka-console-producer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1
# 【有key型消息】
kafka-console-producer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 --property parse.key=true
# 默认消息键与消息值间使用“Tab键”进行分隔切勿使用转义字符(\t)
# kafka命令接受数据
kafka-console-consumer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 --from-beginning
# kafka查看消费进度当我们需要查看一个消费者组的消费进度时则使用下面的命令(启动Consumer的时候才会真的生效)【这条命令对查询消费特别重要】
kafka-consumer-groups.sh --bootstrap-server 172.16.26.183:9092 --describe --group group1
```
## 2. 客户端实践及原理剖析
### 2.1 生产者消息分区机制原理剖析
- 为什么分区?
- Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息
- 主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。
- 为什么使用分区的概念而不是直接使用多个主题呢?
- 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了**实现系统的高伸缩性Scalability**
- 不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能**独立地执行各自分区的读写请求处理**。
- 并且,我们还可以通过**添加新的节点机器来增加整体系统的吞吐量**。
- 实际上分区的概念以及分区数据库早在 1980 年就已经有大牛们在做了,比如那时候有个叫 Teradata 的数据库就引入了**分区的概念**。
- Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard而在 HBase 中则叫 Region在 Cassandra 中又被称作 vnode
- 除了提供负载均衡这种最核心的功能之外,利用分区也可以实现其他一些业务级别的需求,比如**实现业务级别的消息顺序的问题**
- 都有哪些分区策略?
- 所谓分区策略是决定生产者将消息发送到哪个分区的算法。
- 如果要自定义分区策略你需要显式地配置生产者端的参数partitioner.class
- 在编写生产者程序时你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。
- 这个接口也很简单只定义了两个方法partition()和close(),通常你只需要实现最重要的 partition 方法。
- 方法签名int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
- 这里的topic、key、keyBytes、value和valueBytes都属于消息数据cluster则是集群信息比如当前 Kafka 集群共有多少主题、多少 Broker 等)
- Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。
- 只要你自己的**实现类定义好了 partition 方法同时设置partitioner.class参数为你自己实现类的 Full Qualified Name**,那么生产者程序就会按照你的代码逻辑对消息进行分区。
- 比较常见的分区策略
- **轮询策略**
- 也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0第二条被发送到分区 1第三条被发送到分区 2以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0就像下面这张图展示的那样。
- ![轮询策略](pic/轮询策略.png)
- 这就是所谓的轮询策略。轮询策略是 Kafka Java 生产者 API **默认提供的分区策略**。如果你未指定partitioner.class参数那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。
- [轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。]
- **随机策略**
- 也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示。
- ![随机策略](pic/随机策略.png)
- 如果要实现随机策略版的 partition 方法,很简单,只需要两行代码即可:
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- return ThreadLocalRandom.current().nextInt(partitions.size());
- 先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。
- 本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以**如果追求数据的均匀分布,还是使用轮询策略比较好**。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。
- **按消息键保序策略**
- 也称 Key-ordering 策略。这个名词是我自己编的Kafka 官网上并无这样的提法。
- Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。
特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,
**由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略**,如下图所示。
- ![按消息键保序策略](pic/按消息键保序策略.png)
- 实现这个策略的 partition 方法同样简单,只需要下面两行代码即可:
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- return Math.abs(key.hashCode()) % partitions.size();
- 前面提到的 Kafka 默认分区策略实际上同时实现了两种策略:**如果指定了 Key那么默认实现按消息键保序策略如果没有指定 Key则使用轮询策略。**
- 如何实现消息的顺序问题?
- 企业发送的 Kafka 的消息是有因果关系的,故处理因果关系也必须要保证有序性,否则先处理了“果”后处理“因”必然造成业务上的混乱。
- 当时企业的做法是给 Kafka 主题设置单分区,也就是 1 个分区。这样所有的消息都只在这一个分区内读写,因此保证了全局的顺序性。
这样做虽然实现了因果关系的顺序性,但也丧失了 Kafka 多分区带来的高吞吐量和负载均衡的优势。
- 后来经过了解和调研,我发现这种具有因果关系的消息都有一定的特点,比如在**消息体中都封装了固定的标志位**,改进建议他们**对此标志位设定专门的分区策略,保证同一标志位的所有消息都发送到同一分区**
这样既可以保证分区内的消息顺序,也可以享受到多分区带来的性能红利。
- 这种基于个别字段的分区策略本质上就是按消息键保序的思想,其实更加合适的做法是把**标志位数据提取出来统一放到 Key 中,这样更加符合 Kafka 的设计思想**。
- 其他分区策略
- 其实还有一种比较常见的,即所谓的**基于地理位置的分区策略**。当然这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群。
- 假设的所有服务都部署在北京的一个机房(这里我假设它是自建机房,不考虑公有云方案。其实即使是公有云,实现逻辑也差不多),现在考虑在南方找个城市(比如广州)再创建一个机房;
另外从两个机房中选取一部分机器共同组成一个大的 Kafka 集群。显然,这个集群中必然有一部分机器在北京,另外一部分机器在广州。
- 计划为每个新注册用户提供一份注册礼品,比如南方的用户可以免费得到一碗“甜豆腐脑”,而北方的新注册用户可以得到一碗“咸豆腐脑”。
如果用 Kafka 来实现则很简单,只需要创建一个双分区的主题,然后再创建两个消费者程序分别处理南北方注册用户逻辑即可。
- 但问题是你需要把南北方注册用户的注册消息正确地发送到位于南北方的不同机房中,因为处理这些消息的消费者程序只可能在某一个机房中启动着。
换句话说,送甜豆腐脑的消费者程序只在广州机房启动着,而送咸豆腐脑的程序只在北京的机房中,如果你向广州机房中的 Broker 发送北方注册用户的消息,那么这个用户将无法得到礼品!
- 此时我们就可以根据 Broker 所在的 IP 地址实现定制化的分区策略。比如下面这段代码:
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
- 我们可以从所有分区中找出那些 Leader 副本在南方的所有分区,然后随机挑选一个进行消息发送。
- 补充问题
- 在消息重试的时候分区策略会重新再计算一次吗比如一开始选择到5号分区但是5号分区有问题导致重试重试的时候可以重试发送到别的分区上吗
- 不会的。消息重试只是简单地将消息重新发送到之前的分区
- 在看kafka-client生产者默认分区源码时看到了cluster.partitionsForTopic和cluster.availablePartitionsForTopic请问什么时候分区是available什么时候是不unavailable的
- 分区没有leader的时候就是unavailable了。某些操作会导致瞬间没有leader比如分区reassign、换leader等
- RocketMQ与Kafka的主要区别
- Kafka吞吐量大多是面向大数据场景。RocketMQ吞吐量也很强 不过它号称是金融业务级的消息中间件,也就是说可以用于实际的业务系统;
- RocketMQ毕竟是阿里出品在国内技术支持力度要比Kafka强
- Kafka现在主要发力StreamingRocketMQ在流处理这块表现如何我不太清楚至少streaming不是它现阶段的主要卖点。
- 其他方面这两者确实都差不多
- 广州机房怎么消费广州partition的数据consumer如何指定消费的partition。
- 使用这个方法consumer.assign()直接消息指定分区
### 2.2 生产者压缩算法面面观
- 说起压缩compression我相信你一定不会感到陌生。它秉承了**用时间去换空间的经典 trade-off 思想**,具体来说就是用 CPU 时间去换磁盘空间或网络 I/O 传输量,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。在 Kafka 中,压缩也是用来做这件事的。
- 怎么压缩?
- Kafka 是如何压缩消息的呢?要弄清楚这个问题,就要从 Kafka 的消息格式说起了。目前 **Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本**。V2 版本是 Kafka 0.11.0.0 中正式引入的。
- 不论是哪个版本Kafka 的消息层次都分为两层:**消息集合message set以及消息message**。**一个消息集合中包含若干条日志项record item而日志项才是真正封装消息的地方**。
Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka **通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作**。
- V2 版本主要是针对 V1 版本的一些弊端做了修正
- 一个,就是把消息的公共部分抽取出来放到外层消息集合里面,这样就不用每条消息都保存这些信息了。
- 在 V2 版本中,消息的 CRC 校验工作就被移到了消息集合这一层。
- V2 版本还有一个和压缩息息相关的改进,就是保存压缩消息的方法发生了变化。
- 之前 V1 版本中保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息体字段中;而 V2 版本的做法是对整个消息集合进行压缩。显然后者应该比前者有更好的压缩效果。
- V2 版本都比 V1 版本节省磁盘空间。
- 何时压缩?
- 在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。
- 生产者程序中配置 compression.type 参数即表示启用指定类型的压缩算法。
- 下面这段程序代码展示了如何构建一个开启 GZIP 的 Producer 对象:
```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启 GZIP 压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
```
- 这里比较关键的代码行是 props.put(“compression.type”, “gzip”),它表明该 Producer 的压缩算法使用的是 GZIP。这样 Producer 启动后生产的每个消息集合都是经 GZIP 压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。
- 在生产者端启用压缩是很自然的想法,那为什么我说在 Broker 端也可能进行压缩呢?其实大部分情况下 **Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改**,但这里的“大部分情况”也是要满足一定条件的。
- 有两种例外情况就可能让 Broker 重新压缩消息。
- 情况一Broker 端指定了和 Producer 端不同的压缩算法。
- Producer 说:“我要使用 GZIP 进行压缩。”
- Broker 说:“不好意思,我这边接收的消息必须使用 Snappy 算法进行压缩。”
- Kafka 官网,你会发现 Broker 端也有一个参数叫 compression.type和上面那个例子中的同名。但是这个**参数的默认值是 producer**,这表示 Broker 端会“尊重”Producer 端使用的压缩算法。
可一旦你在 Broker 端设置了不同的 compression.type 值,就一定要小心了,因为可能会发生预料之外的压缩 / 解压缩操作,**通常表现为 Broker 端 CPU 使用率飙升**。
- 情况二Broker 端发生了消息格式转换。
- 在一个生产环境中Kafka 集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,
它还让 Kafka 丧失了引以为豪的 Zero Copy 特性。
- 所以尽量保证消息格式的统一吧,这样不仅可以避免不必要的解压缩 / 重新压缩,对提升其他方面的性能也大有裨益。如果有兴趣你可以深入地了解下 Zero Copy 的原理。
- 何时解压缩?
- 通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到 Broker 后Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时Broker 依然原样发送出去,**当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息**。
- Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。**Kafka 会将启用了哪种压缩算法封装进消息集合中**,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。
- 住这句话:**Producer 端压缩、Broker 端保持、Consumer 端解压缩。**
- 除了在 Consumer 端解压缩Broker 端也会进行解压缩。注意了,这和前面提到消息格式转换时发生的解压缩是不同的场景。每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,**目的就是为了对消息执行各种验证**。
我们必须承认**这种解压缩对 Broker 端性能是有一定影响的,特别是对 CPU 的使用率而言**。
- 国内京东的小伙伴们刚刚向社区提出了一个 bugfix建议去掉因为做消息校验而引入的解压缩。据他们称**去掉了解压缩之后Broker 端的 CPU 使用率至少降低了 50%**。
- 目前社区并未采纳这个建议,原因就是这种消息校验是非常重要的,不可盲目去之。毕竟先把事情做对是最重要的,**在做对的基础上,再考虑把事情做好做快**。
- 你也可以思考一下,是否有一个两全其美的方案,既能避免消息解压缩也能对消息执行校验。
- 各种压缩算法对比
- 在 Kafka 2.1.0 版本之前Kafka 支持 3 种压缩算法:**GZIP、Snappy 和 LZ4**。从 2.1.0 开始Kafka 正式支持 **Zstandard** 算法(简写为 zstd。它是 Facebook 开源的一个压缩算法能够提供超高的压缩比compression ratio
- 对了,看一个压缩算法的优劣,有两个重要的指标:
- 一个指标是**压缩比**,原先占 100 份空间的东西经压缩之后变成了占 20 份空间,那么压缩比就是 5显然压缩比越高越好
- 另一个指标就是**压缩 / 解压缩吞吐量**,比如每秒能压缩或解压缩多少 MB 的数据。同样地,吞吐量也是越高越好。
- Facebook Zstandard 官网提供的一份压缩算法 benchmark 比较结果:
- ![Zstandard压缩算法对比](pic/Zstandard压缩算法对比.png)
- zstd 算法有着最高的压缩比,而在吞吐量上的表现只能说中规中矩。
- 反观 LZ4 算法,它在吞吐量方面则是毫无疑问的执牛耳者。
- 在实际使用中GZIP、Snappy、LZ4 甚至是 zstd 的表现各有千秋。
- 但对于 Kafka 而言,它们的性能测试结果却出奇得一致,
- 即**在吞吐量方面LZ4 > Snappy > zstd 和 GZIP**
- 而**在压缩比方面zstd > LZ4 > GZIP > Snappy**。
- 具体到**物理资源**,使用 **Snappy 算法占用的网络带宽最多zstd 最少**,这是合理的,毕竟 zstd 就是要提供超高的压缩比;
-**CPU 使用率**方面,各个算法表现得差不多,**只是在压缩时 Snappy 算法使用的 CPU 较多一些**
- 而**在解压缩时 GZIP 算法则可能使用更多的 CPU**。
- 最佳实践
- 首先来说压缩。何时启用压缩是比较合适的时机呢?
- 你现在已经知道 Producer 端完成的压缩,那么启用压缩的一个条件就是 **Producer 程序运行机器上的 CPU 资源要很充足**。如果 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反。
- 除了 CPU 资源充足这一条件,如果你的环境中带宽资源有限,那么我也建议你开启压缩。
- 事实上我见过的很多 Kafka 生产环境都遭遇过带宽被打满的情况。这年头,**带宽可是比 CPU 和内存还要珍贵的稀缺资源**,毕竟万兆网络还不是普通公司的标配,因此千兆网络中 Kafka 集群带宽资源耗尽这件事情就特别容易出现。
- **如果你的客户端机器 CPU 资源有很多富余,我强烈建议你开启 zstd 压缩,这样能极大地节省网络资源消耗**。
- 其次说说解压缩。其实也没什么可说的。
- 一旦启用压缩,解压缩是不可避免的事情。
- 这里只想强调一点:我们对不可抗拒的解压缩无能为力,但至少能规避掉那些意料之外的解压缩。就像我前面说的,因为要兼容老版本而引入的解压缩操作就属于这类。有条件的话尽量保证不要出现消息格式转换的情况。
- 补充问题
- 正常情况下broker端会原样保存起来但是为了检验需要解压缩。该怎么去理解这个过程呢broker端解压缩以后还会压缩还原吗
- 它只是解压缩读取而已,不会将解压缩之后的数据回写到磁盘。另外就像我置顶的留言那样,目前社区已经接纳了京东小伙伴的修改,貌似可以绕过这部分解压缩了.
- 规避了broker端为执行校验而做的解压缩操作代码也merge进了2.4版本。有兴趣的同学可以看一下:
- https://issues.apache.org/jira/browse/KAFKA-8106
- 消息层次、消息集合、消息、日志项这些概念与它们之间的关系
- 消息批次RecordBatch里面包含若干条消息record)。 你可以认为消息批次和消息集合是等价的,消息和日志项是等价的。这样消息层次有两层:外层是消息批次(或消息集合);里层是消息(或日志项)。
Producer以recordbatch为单位发送消息对于V2版本一个batch中通常包含多条消息。在V2版本中在batch层面计算CRC值在V1版本中每条消息都要计算CRC值。
### 2.3 无消息丢失配置怎么实现?
- 一句话概括Kafka 只对**“已提交”的消息committed message做有限度的持久化保证**。
- 第一个核心要素是 **“已提交的消息”**。什么是已提交的消息?当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。
- 那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个 Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。不论哪种情况Kafka 只对已提交的消息做持久化保证这件事情是不变的。
- 第二个核心要素就是 **“有限度的持久化保证”**
- 假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中**至少有 1 个存活**。只要这个条件成立Kafka 就能保证你的这条消息永远不会丢失。
- “消息丢失”案例
- 案例 1生产者程序丢失数据
- 目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API那么它通常会立即返回但此时你不能认为消息发送已成功完成。
- 这种发送方式有个有趣的名字叫“fire and forget”翻译一下就是“发射后不管”。它的意思是执行完一个操作后不去管它的结果是否成功。调用 producer.send(msg) 就属于典型的“fire and forget”
- 如果用这个方式,可能会有哪些因素导致消息没有发送成功呢?
- 其实原因有很多,例如**网络抖动,导致消息压根就没有发送到 Broker 端**;或者**消息本身不合格导致 Broker 拒绝接收**(比如消息太大了,超过了 Broker 的承受能力)
- 就像前面说过的Kafka 不认为消息是已提交的,因此也就没有 Kafka 丢失消息这一说了。
- 解决此问题的方法非常简单:
- Producer **永远要使用带有回调通知的发送 API**,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。
- (回调),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。
- 如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;如果是消息不合格造成的,那么可以调整消息格式后再次发送。
- 总之,**处理发送失败的责任在 Producer 端而非 Broker 端。**
- 案例 2消费者程序丢失数据
- Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。Consumer 程序有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。
- ![Consumer端的位移数据](pic/Consumer端的位移数据.png)
- 比如对于 Consumer A 而言,它当前的位移值就是 9Consumer B 的位移值是 11。
- 这里的“位移”类似于我们看书时使用的书签,它会标记我们当前阅读了多少页,下次翻书的时候我们能直接跳到书签页继续阅读。
- 正确使用书签有两个步骤:第一步是读书,第二步是更新书签页。**不能颠倒**
- 要对抗这种消息丢失,办法很简单:**维持先消费消息(阅读),再更新位移(书签)的顺序即可**。这样就能最大限度地保证消息不丢失。
- 当然,**这种处理方式可能带来的问题是消息的重复处理**,类似于同一页书被读了很多遍,但这不属于消息丢失的情形。
- 还存在一种比较隐蔽的消息丢失场景。
- 对于 Kafka 而言, Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。
假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。
- 这里的**关键在于 Consumer 自动提交位移**
- 这个问题的解决方案也很简单:
- 如果是多线程异步处理消费消息Consumer 程序不要开启自动提交位移,而是要应用程序**手动提交位移**。
- 单个 Consumer 程序使用多线程来消费消息说起来容易,写成代码却异常困难,因为你很难正确地处理位移的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。
- 最佳实践
- 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,**一定要使用带有回调通知的 send 方法**。
- **设置 acks = all**。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
- **设置 retries 为一个较大的值**。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
- **设置 unclean.leader.election.enable = false**。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader必然会造成消息的丢失。
故一般都要将该参数设置成 false即不允许这种情况的发生。
- **设置 replication.factor >= 3**。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
- **设置 min.insync.replicas > 1**。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
- 确保 replication.factor > min.insync.replicas。如果两者相等那么只要有一个副本挂机整个分区就无法正常工作了。我们不仅要改善消息的持久性防止数据丢失还要在不降低可用性的基础上完成。
**推荐设置成 replication.factor = min.insync.replicas + 1**。
- 确保消息消费完成再提交。Consumer 端有个参数 **enable.auto.commit最好把它设置成 false**,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
- 补充
- 设置 acks = all。表明所有副本 Broker 都要接收到消息该消息才算是“已提交”。如果所有的Broker都要收到消息才能算作已提交会不会对系统的吞吐量影响很大另外这里的副本指的是不是仅仅是ISR?
- 碰到的实际场景影响还是很大的。acks=all时大部分的请求处理延时都花在了follower同步上。 是的acks=all表明所有ISR中的副本都要同步。
### 2.4 客户端都有哪些不常见但是很高级的功能?
- 什么是拦截器?
- 基本思想就是允许应用程序在**不修改逻辑的情况下****动态地实现**一组可**插拔的事件处理逻辑链**。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。
- Kafka 拦截器
- Kafka **拦截器分为生产者拦截器和消费者拦截器**。
- 生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。
值得一提的是,这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,**Kafka 会按照添加顺序依次执行拦截器逻辑**。
- Kafka 拦截器的设置方法是通过参数配置完成的。生产者和消费者两端有一个相同的参数,名字叫 **interceptor.classes**,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类。
- 假设第一个拦截器的完整类路径是 com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor第二个类是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor
那么你需要按照以下方法在 Producer 端指定拦截器:
```java
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器 1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器 2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
```
- 我们应该怎么编写 AddTimeStampInterceptor 和 UpdateCounterInterceptor 类呢?
- 这两个类以及你自己编写的所有 Producer 端拦截器实现类都**要继承 org.apache.kafka.clients.producer.ProducerInterceptor 接口**。该接口是 Kafka 提供的,里面有两个核心的方法。
- onSend该方法会在消息发送之前被调用。如果你想在**发送之前**对消息“美美容”,这个方法是你唯一的机会。
- onAcknowledgement该方法会在消息成功提交或发送失败之后被调用。还记得我在上一期中提到的发送回调通知 callback 吗?**onAcknowledgement 的调用要早于 callback 的调用**。
值得注意的是,**这个方法和 onSend 不是在同一个线程中被调用的**,因此如果你在这两个方法中调用了某个共享可变对象,**一定要保证线程安全哦**。
还有一点很重要,这个方法处在 Producer 发送的主路径中,所以**最好别放一些太重的逻辑进去**,否则你会发现你的 Producer TPS 直线下降。
- 同理,指定消费者拦截器也是同样的方法,只是具体的实现类要**实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口**,这里面也有两个核心方法。
- onConsume该方法在消息返回给 Consumer 程序**之前**调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。
- onCommitConsumer 在**提交位移之后调用**该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
- 一定要注意的是,指定拦截器类时要指定它们的**全限定名**,即 full qualified name。通俗点说就是要把完整包名也加上不要只有一个类名在那里并且还要保证你的 Producer 程序能够正确加载你的拦截器类。
- 典型使用场景
- 其实,跟很多拦截器的用法相同,**Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计**等多种功能在内的场景。
- Kafka 默认提供的监控指标都是针对单个客户端或 Broker 的,你很难从具体的消息维度去**追踪集群间消息的流转路径**。同时,如何监控一条消息从生产到最后消费的端到端延时也是很多 Kafka 用户迫切需要解决的问题。
- 通过实现拦截器的逻辑以及可插拔的机制,我们能够快速地观测、验证以及监控集群间的客户端性能指标,特别是能够**从具体的消息层面上去收集这些数据**。这就是 Kafka 拦截器的一个非常典型的使用场景。
- 我们再来看看消息审计message audit的场景。设想你的公司把 Kafka 作为一个私有云消息引擎平台向全公司提供服务,这必然要**涉及多租户以及消息审计的功能**。
- 作为私有云的 PaaS 提供方,你肯定要能够随时查看每条消息是哪个业务方在什么时间发布的,之后又被哪些业务方在什么时刻消费。
一个可行的做法就是你编写一个拦截器类,实现相应的消息审计逻辑,然后强行规定所有接入你的 Kafka 服务的客户端程序必须设置该拦截器。
- 案例分享
- 通过编写拦截器类来统计消息端到端处理的延时,非常实用,我建议你可以直接移植到你自己的生产环境中。
- 某个业务只有一个 Producer 和一个 Consumer他们想知道该业务消息从被生产出来到最后被消费的平均总时长是多少但是目前 Kafka 并没有提供这种端到端的延时统计。
- 既然是要计算总延时,**那么一定要有个公共的地方来保存它**,并且这个公共的地方还是要让生产者和消费者程序都能访问的。在这个例子中,我们假设数据被保存在 Redis 中。
- 实现生产者拦截器
```java
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
private Jedis jedis; // 省略 Jedis 初始化
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
jedis.incr("totalSentMessage");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<java.lang.String, ?> configs) {
}
```
- 上面的代码比较关键的是在发送消息前更新总的已发送消息数。为了节省时间,我没有考虑发送失败的情况,因为发送失败可能导致总发送数不准确。不过好在处理思路是相同的,你可以有针对性地调整下代码逻辑。
- 消费者端的拦截器实现,代码如下:
```java
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Jedis jedis; // 省略 Jedis 初始化
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long lantency = 0L;
for (ConsumerRecord<String, String> record : records) {
lantency += (System.currentTimeMillis() - record.timestamp());
}
jedis.incrBy("totalLatency", lantency);
long totalLatency = Long.parseLong(jedis.get("totalLatency"));
long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
```
- 在上面的消费者拦截器中,我们在真正消费一批消息前首先更新了它们的总延时,方法就是用**当前的时钟时间减去封装在消息中的创建时间,然后累计得到这批消息总的端到端处理延时并更新到 Redis 中**。
之后的逻辑就很简单了,我们分别从 Redis 中读取更新过的总延时和总消息数,两者相除即得到端到端消息的平均处理延时。
- 创建好生产者和消费者拦截器后,我们按照上面指定的方法分别将它们配置到各自的 Producer 和 Consumer 程序中,这样就能计算消息从 Producer 端到 Consumer 端平均的处理延时了。
这种端到端的指标监控能够从全局角度俯察和审视业务运行情况,及时查看业务是否满足端到端的 SLA 目标。
### 2.5 Java生产者是如何管理TCP连接的
- 为何采用 TCP
- 从社区的角度来看,在开发客户端时,人们能够利用 TCP 本身提供的一些高级功能,比如**多路复用请求以及同时轮询多个连接**的能力。
- 所谓的多路复用请求,即 multiplexing request是指将两个或多个数据流合并到底层单一物理连接中的过程。**TCP 的多路复用请求会在一条物理连接上创建若干个虚拟连接,每个虚拟连接负责流转各自对应的数据流**。
其实严格来说,**TCP 并不能多路复用,它只是提供可靠的消息交付语义保证**,比如自动重传丢失的报文。
- 作为一个基于报文的协议TCP 能够被用于多路复用连接场景的前提是,上层的应用协议(比如 HTTP允许发送多条消息。
- 除了 TCP 提供的这些高级功能有可能被 Kafka 客户端的开发人员使用之外,社区还发现,**目前已知的 HTTP 库在很多编程语言中都略显简陋**。
- 基于这两个原因Kafka 社区决定采用 TCP 协议作为所有请求通信的底层协议。
- Kafka 生产者程序概览
- Kafka 的 Java 生产者 API 主要的对象就是 KafkaProducer。通常我们开发一个生产者的步骤有 4 步。
- 第 1 步:构造生产者对象所需的参数对象。
- 第 2 步:利用第 1 步的参数对象,创建 KafkaProducer 对象实例。
- 第 3 步:使用 KafkaProducer 的 send 方法发送消息。
- 第 4 步:调用 KafkaProducer 的 close 方法关闭生产者并释放各种系统资源。
- 代码示例:
```java
Properties props = new Properties ();
props.put("参数 1", "参数 1 的值")
props.put("参数 2", "参数 2 的值")
//
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
producer.send(new ProducerRecord<String, String>(……), callback);
//
}
```
- 这段代码使用了 Java 7 提供的 try-with-resource 特性,所以并没有显式调用 producer.close() 方法。无论是否显式调用 close 方法,所有生产者程序大致都是这个路数。
- 当我们开发一个 Producer 应用时,生产者会向 Kafka 集群中指定的主题Topic发送消息这必然涉及与 Kafka Broker 创建 TCP 连接。那么Kafka 的 Producer 客户端是如何管理这些 TCP 连接的呢?
- 何时创建 TCP 连接?
- 生产者代码是什么时候创建 TCP 连接的。就上面的那段代码而言,可能创建 TCP 连接的地方有两处Producer producer = new KafkaProducer(props) 和 producer.send(msg, callback)。你觉得连向 Broker 端的 TCP 连接会是哪里创建的呢?前者还是后者,抑或是两者都有?
- **在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与 Broker 的连接。**
- 测试环境中的日志来说明这一点:
```txt
[2018-12-09 09:35:45,620] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: -2 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,622] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
[2018-12-09 09:35:45,814] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,815] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
[2018-12-09 09:35:45,828] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9093 (id: -2 rack: null) (org.apache.kafka.clients.NetworkClient:1068)
```
- 如果不调用 send 方法,这个 Producer 都不知道给哪个主题发消息,它又怎么能知道连接哪个 Broker 呢?难不成它会连接 bootstrap.servers 参数指定的所有 Broker 吗?
- 解释一下 bootstrap.servers 参数。它是 Producer 的核心参数之一,指定了这个 Producer 启动时要连接的 Broker 地址。
- 这里的“启动时”,代表的是 Producer 启动时会发起与这些 Broker 的连接。因此,如果你为这个参数指定了 1000 个 Broker 连接信息,那么很遗憾,你的 **Producer 启动时会首先创建与这 1000 个 Broker 的 TCP 连接**
- 在实际使用过程中,我并**不建议把集群中所有的 Broker 信息都配置到 bootstrap.servers 中**,通常你指定 **34 台就足以**了。因为 **Producer 一旦连接到集群中的任一台 Broker就能拿到整个集群的 Broker 信息,故没必要为 bootstrap.servers 指定所有的 Broker**
- 在 KafkaProducer 实例被创建后以及消息被发送前Producer 应用就开始创建与两台 Broker 的 TCP 连接了。
- 你不需要了解 RecordAccumulator 类是做什么的,你只要知道它主要的数据结构是一个 ConcurrentMap<TopicPartition, Deque>。TopicPartition 是 Kafka 用来表示主题分区的 Java 对象,本身是不可变对象。而 RecordAccumulator 代码中用到 Deque 的地方都有锁的保护,所以基本上可以认定 RecordAccumulator 类是线程安全的。
- **TCP 连接是在创建 KafkaProducer 实例时建立的。**
- 它只会在这个时候被创建吗?
- 当然不是TCP 连接还可能在两个地方被创建:一个是在更新元数据后,另一个是在消息发送时。
- 当 Producer 更新了集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。同样地当要发送消息时Producer 发现尚不存在与目标 Broker 的连接,也会创建一个。
- Producer 更新集群元数据信息的两个场景
- 场景一:当 Producer 尝试给一个不存在的主题发送消息时Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。
- 场景二Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000即 5 分钟也就是说不管集群那边是否有变化Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。
- 何时关闭 TCP 连接?
- Producer 端关闭 TCP 连接的方式有两种:**一种是用户主动关闭;一种是 Kafka 自动关闭**。
- 第一种。这里的主动关闭实际上是广义的主动关闭,甚至包括用户调用 kill -9 主动“杀掉”Producer 应用。当然最推荐的方式还是调用 producer.close() 方法来关闭。
- 第二种是 Kafka 帮你关闭,这与 Producer 端参数 connections.max.idle.ms 的值有关。默认情况下该参数值是 9 分钟,即如果在 9 分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。用户可以在 Producer 端设置 connections.max.idle.ms=-1 禁掉这种机制。一旦被设置成 -1TCP 连接将成为永久长连接。
当然这只是软件层面的“长连接”机制,由于 Kafka 创建的这些 Socket 连接都开启了 keepalive因此 keepalive 探活机制还是会遵守的。
- 在第二种方式中TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。
### 2.6 幂等生产者和事务生产者是一回事吗?
- Kafka 消息交付可靠性保障以及精确处理一次语义的实现。
- 所谓的消息交付可靠性保障,**是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺**。常见的承诺有以下三种:
- 最多一次at most once消息可能会丢失但绝不会被重复发送。
- 至少一次at least once消息不会丢失但有可能被重复发送。
- 精确一次exactly once消息不会丢失也不会被重复发送。
- 目前Kafka **默认提供的交付可靠性保障是第二种**,即至少一次。
- Kafka 也可以提供最多一次交付保障,**只需要让 Producer 禁止重试即可**。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。我们通常不会希望出现消息丢失的情况,但一些场景里偶发的消息丢失其实是被允许的,相反,消息重复是绝对要避免的。此时,使用最多一次交付保障就是最恰当的。
- 那么问题来了Kafka 是怎么做到精确一次的呢?简单来说,这是通过两种机制:**幂等性Idempotence和事务Transaction**。
- 什么是幂等性Idempotence
- 幂等性有很多好处,其最大的优势在于**我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态**。如果是非幂等性操作,我们还需要担心某些操作执行多次对状态的影响,但对于幂等性操作而言,我们根本无需担心此事。
- 幂等性 Producer
- 在 Kafka 中Producer **默认不是幂等性的**,但我们可以创建幂等性 Producer。它其实是 **0.11.0.0 版本引入的新功能**
- 在此之前Kafka 向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可
- **props.put(“enable.idempotence”, ture)**,或 **props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG true)**
- enable.idempotence 被设置成 true 后Producer **自动升级成幂等性 Producer**,其他所有的代码逻辑都不需要改变。
- **kafka 自动帮你做消息的重复去重**。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。
- 必须要了解幂等性 Producer 的作用范围
- 首先,它只能保证**单分区上的幂等性**,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
- 其次,它只能实现**单会话上的幂等性**,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
- 如果我想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是**事务transaction或者依赖事务型 Producer**。这也是幂等性 Producer 和事务型 Producer 的**最大区别**
- 事务
- 经典的 ACID即原子性Atomicity、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)。
- 隔离性表明并发执行的事务彼此相互隔离,互不影响。
- Kafka 自 **0.11 版本开始也提供了对事务的支持**,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。
- 事务型 Producer
- 事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后Kafka 依然保证它们发送消息的精确一次处理。
- 设置事务型 Producer 的方法也很简单,满足两个要求即可:
- 和幂等性 Producer 一样,开启 **enable.idempotence = true**
- **设置 Producer 端参数 transctional. id**。最好为其设置一个有意义的名字。
- 此外,你还需要在 Producer 代码中做一些调整,如这段代码所示:
```java
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
```
- 和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
- 这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka要么它们全部提交成功要么全部写入失败。实际上即使写入失败Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。
因此在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。
- 修改起来也很简单,设置 isolation.level 参数的值即可。当前这个参数有两个取值:
- read_uncommitted这是默认值表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer那么对应的 Consumer 就不要使用这个值。
- read_committed表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
- 比起幂等性 Producer事务型 Producer 的性能要更差,在实际使用过程中,我们需要仔细评估引入事务的开销,切不可无脑地启用事务。
- 补充
- 事务的使用场景
- 事务更多用在Kafka Streams中。如果要实现流处理中的精确一次语义事务是不可少的。
- 事务型 Producer 可以实现一组消息要么全部写入成功,要么全部失败,但是事务型 Producer 是具体怎么实现多分区以及多会话上的消息无重复的呢?
- 主要的机制是两阶段提交2PC。引入了事务协调器的组件帮助完成分布式事务
### 2.7 消费者组到底是什么?
- 用一句话概括就是Consumer Group 是 Kafka 提供的**可扩展且具有容错性的消费者机制**。
- 既然是一个组那么组内必然可以有多个消费者或消费者实例Consumer Instance它们共享一个公共的 ID这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题Subscribed Topics的所有分区Partition。当然每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。
- Consumer Group 记住下面这三个特性
- Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,**使用进程更为常见一些**。
- Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
- Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。
- 两种消息引擎模型
- 点对点模型和发布 / 订阅模型,前者也称为消费队列。
- Kafka 的 Consumer Group 就是这样的机制。当 Consumer Group 订阅了多个主题后,**组内的每个实例不要求一定要订阅主题的所有分区,它只会消费部分分区中的消息**。
- Consumer Group 之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。再加上 Broker 端的消息留存机制Kafka 的 Consumer Group 完美地规避了上面提到的伸缩性差的问题。
- 可以这么说Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引擎系统的两大模型:
- 如果所有实例都属于同一个 Group那么它实现的就是消息队列模型如果所有实例分别属于不同的 Group那么它实现的就是发布 / 订阅模型。
- 在实际使用场景中,我怎么知道一个 Group 下该有多少个 Consumer 实例呢?
- **理想情况下Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。**
- 假设一个 Consumer Group 订阅了 3 个主题,分别是 A、B、C它们的分区数依次是 1、2、3那么通常情况下为该 Group 设置 6 个 Consumer 实例是比较理想的情形,因为它能最大限度地实现高伸缩性。
- **在实际使用过程中一般不推荐设置大于总分区数的 Consumer 实例**。设置多余的实例只会浪费资源,而没有任何好处。
- 针对 Consumer GroupKafka 是怎么管理位移的呢?你还记得吧,消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息。在 Kafka 中这个位置信息有个专门的术语位移Offset
- 看上去该 Offset 就是一个数值而已,其实对于 Consumer Group 而言,它是一组 KV 对Key 是分区V 对应 Consumer 消费该分区的最新位移。
- 如果用 Java 来表示的话,你大致可以认为是这样的数据结构,即 Map<TopicPartition, Long>,其中 TopicPartition 表示一个分区,而 Long 表示位移的类型。
- Kafka 源码中并不是这样简单的数据结构,而是要比这个复杂得多
- Kafka 有新旧客户端 API 之分,那自然也就有新旧 Consumer 之分。老版本的 Consumer 也有消费者组的概念,它和我们目前讨论的 Consumer Group 在使用感上并没有太多的不同,只是**它管理位移的方式和新版本是不一样的**。
- 老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一个分布式的协调服务框架Kafka 重度依赖它实现各种各样的协调管理。将位移保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销。
- **现在比较流行的提法是将服务器节点做成无状态的,这样可以自由地扩缩容,实现超强的伸缩性。**
- Kafka 最开始也是基于这样的考虑,才将 Consumer Group 位移保存在独立于 Kafka 集群之外的框架中。
- 发现了一个问题,即 ZooKeeper 这类元框架其实并不适合进行频繁的写更新,而 Consumer Group 的位移更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能,因此 Kafka 社区渐渐有了这样的共识:将 Consumer 位移保存在 ZooKeeper 中是不合适的做法。
- 于是,在新版本的 Consumer Group 中Kafka 社区重新设计了 Consumer Group 的位移管理方式,采用了将位移保存在 Kafka 内部主题的方法。这个内部主题就是让人既爱又恨的 __consumer_offsets。
- **新版本的 Consumer Group 将位移保存在 Broker 端的内部主题中**。
- 最后,我们来说说 Consumer Group 端大名鼎鼎的重平衡,也就是所谓的 Rebalance 过程。我形容其为“大名鼎鼎”,从某种程度上来说其实也是“臭名昭著”,因为有关它的 bug 真可谓是此起彼伏,从未间断
- Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。
- 比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。
- 那么 Consumer Group 何时进行 Rebalance 呢Rebalance 的触发条件有 3 个。
- 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
- 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
- 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
- Rebalance 发生时Group 下所有的 Consumer 实例都会协调在一起共同参与。你可能会问,每个 Consumer 实例怎么知道应该消费订阅主题的哪些分区呢?这就需要分配策略的协助了。
- 当前 Kafka 默认提供了 3 种分配策略,每种策略都有一定的优势和劣势,我们今天就不展开讨论了
- 例子来说明一下 Consumer Group 发生 Rebalance 的过程。假设目前某个 Consumer Group 下有两个 Consumer比如 A 和 B当第三个成员 C 加入时Kafka 会触发 Rebalance并根据默认的分配策略重新为 A、B 和 C 分配分区,如下图所示:
- ![Rebalance的过程](pic/Rebalance的过程.png)
- 显然Rebalance 之后的分配依然是公平的,即每个 Consumer 实例都获得了 3 个分区的消费权。这是我们希望出现的情形。
- Rebalance 过程对 Consumer Group 消费过程有极大的影响
- 在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。
- 其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。
- 例如实例 A 之前负责消费分区 1、2、3那么 Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3而不是被重新分配其他的分区。这样的话实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。
- 最后Rebalance 实在是太慢了。曾经,有个国外用户的 Group 内有几百个 Consumer 实例,成功 Rebalance 一次要几个小时!
- 最好的解决方案就是避免 Rebalance 的发生。
- 补充
- 消费组中的消费者个数如果超过topic的分区数就会有消费者消费不到数据。但如果是同一个消费组里的两个消费者通过assign方法订阅了同一个TopicPartition是不是会有一个消费者不能消费到消息
- 如果使用assign则表明该consumer是独立consumerstandalone consumer它不属于任何消费者组。独立consumer可以订阅任何分区彼此之间也没有关系即两个独立consumer可以订阅并消费相同的分区
- 减少topic的分区数可以减少服务部署时rebalance的时间呢
- 减少consumer个数也有缩短rebalance。
- Kafka 社区重新设计了 Consumer Group 的位移管理方式”
- 0.9版本引入了新版本consumer。新版本将位移保存在__consumer_offsets中不需要额外配置。如果你想保存在外部的Zk集群中那么设置consumer端参数enable.auto.commit=false然后自己调用ZooKeeper API提交位移到Zk即可。
### 2.8 揭开神秘的“位移主题”面纱
## 3. Producer生产者
## 4. Consumer
## 5. Stream
## 6. Connect
## 7. Kafka集群部署与开发
## 8. Kafka集群监控、安全与最佳实践
## 9. Kafka面试点梳理