You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

456 lines
42 KiB
Markdown

# Kafka
## 0. 目录
- Kafka Kafka基本概念及配置
Kafka 客户端实践及原理剖析
Kafka 内核及源码
Kafka 管理和监控
Kafka 流处理
## 1. Kafka基本概念及配置
- 官方QuickStart: https://kafka.apache.org/quickstart
- 官方文档: https://kafka.apache.org/documentation/
- 最新版本 3.2.0 【截止2022-7-20】
- 学习版本 kafka_2.11-2.4.0【前面是scala后面是kafka】 zookeeper-3.5.7【截止2022-7-20】
### 1.1 概念
- 概述
- Kafka 是分布式的流处理平台
- Kafka 是基于Zookeeper的分布式消息系统
- Kafka具有高吞吐率、高性能、实时、高可靠等特点
- Kafka 的组成概念
- Topic: 一个虚拟的概念, 由1到多个Partitions组成
- Partition: 实际消息存储单位
- Producer: 消息生产者
- Consumer: 消息消费者
-
### 1.2 安装Zookeeper(快捷)
```shell
# 修改配置文件
cp zoo_sample.cfg zoo.cfg
# 需要修改参数
dataDir=指定一个存储配置文件的目录
# 启动
./zkServer.sh start
# 连接测试
./zkCli.sh
```
### 1.3 安装Kafka
```shell
# 修改基本的配置文件
vim server.properties
# 需要修改的配置项
listeners=PLAINTEXT://[你的ip地址]:9092
advertised.listeners=PLAINTEXT://[你的ip地址]:9092
log.dirs=你的kafka的日志目录
zookeeper.connect=[你的ip地址]:2181
```
### 1.4 Kafka 的线上方案
- 操作系统
- 在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的 快速数据传输特性。
- Windows 平台上部署 Kafka 只适合于个人测试或用于功能验证, 千万不要应用于生产环境。
- 磁盘
- **普通的机械磁盘** 即可, Kafka 大量使用磁盘不假,可它使用的方式多是顺序读写操作,一定程度上规避了机械磁盘 最大的劣势,即随机读写操作慢。
- 追求性价比的公司可以**不搭建 RAID**,使用**普通磁盘组成存储空间即可**。 [Kafka 1.1开始正式支持JBOD了。再说Kafka本身在软件层面也提供了冗余的机制来对 抗磁盘损坏。]
- 磁盘容量
- 在规划磁盘容量时你需要考虑下面这几个元素:
- 新增消息数
- 消息留存时间
- 平均消息大小
- 备份数
- 是否启用压缩
- 例子:
- 假设你所在公司有个业务每天需要向 Kafka 集群发送 **1 亿条消息**,每条消息**保存两份**以防止数据丢失,另外消息默认**保存两周** 时间。现在假设消息的**平均大小是 1KB**?
- 每天 1 亿条 1KB 大小的消息,保存两份且留存两周的时间,那么总的空 间大小就等于 1 亿 * 1KB * 2 / 1000 / 1000 = 200GB。一般情况下 Kafka 集群除了消息 数据还有其他类型的数据,
- 比如索引数据等,故我们再为这些数据预留出 10% 的磁盘空 间,因此总的存储容量就是 220GB。既然要保存两周那么整体容量即为 220GB * 14 大约 3TB 左右。Kafka 支持数据的压缩,
假设压缩比是 0.75,那么最后你需要规划的存储 空间就是 0.75 * 3 = 2.25TB。
- 带宽
- 对于 Kafka 这种通过网络大量进行数据传输的框架而言,**带宽特别容易成为瓶颈**。事实 上,在我接触的真实案例当中,带宽资源不足导致 Kafka 出现性能问题的比例至少占 60% 以上。
如果你的环境中还涉及跨机房传输,那么情况可能就更糟了。
- 带宽也主要有 两种:1Gbps 的千兆网络和 10Gbps 的万兆网络,特别是千兆网络应该是一般公司网络的标准配置了。
- 与其说是带宽资源的规划,其实真正要规划的是所需的 Kafka 服务器的数量。
- 假设你公司 的机房环境是千兆网络,即 1Gbps现在你有个业务其业务目标或 SLA 是在 1 小时内处 理 1TB 的业务数据。那么问题来了,你到底需要多少台 Kafka 服务器来完成这个业务呢?
- 让我们来计算一下,由于带宽是 1Gbps即每秒处理 1Gb 的数据,假设每台 Kafka 服务 器都是安装在专属的机器上,也就是说每台 Kafka 机器上没有混布其他服务,毕竟真实环境中不建议这么做。
通常情况下你只能假设 Kafka 会用到 70% 的带宽资源,因为总要为其 他应用或进程留一些资源。根据实际使用经验,超过 70% 的阈值就有网络丢包的可能性了,故 70% 的设定是一个比 较合理的值,
也就是说单台 Kafka 服务器最多也就能使用大约 700Mb 的带宽资源。稍等,这只是它能使用的最大带宽资源,你不能让 Kafka 服务器常规性使用这么多资源, 故通常要再额外预留出 2/3 的资源,
即单台服务器使用带宽 700Mb / 3 ≈ 240Mbps。需 要提示的是,这里的 2/3 其实是相当保守的,你可以结合你自己机器的使用情况酌情减少 此值。 好了,有了 240Mbps
我们就可以计算 1 小时内处理 1TB 数据所需的服务器数量了。根 据这个目标,我们**每秒需要处理 2336Mb [1024*1024/3600*8] 的数据,除以 240约等于 10 台服务器**。
如果 消**息还需要额外复制两份,那么总的服务器台数还要乘以 3即 30 台**。
- Kafka 性能测试脚本
- kafka-producer-perf-test 脚本还不错kafka-consumer-perf-test有点难用
- 混部
- 一个机器上 部署一个zookeeper和一个kafka, 如果配置好**ZooKeeper事务日志**(比如设置好autopurge.purgeInterval及定期删除 snapshot等)它对IO的需求不大混布也是可以的。
---
- 最最最重要的集群参数配置
- Kafka 服务器端的配置,其中既有 Broker 端参数,也有主题(Topic)级别的参数、JVM 端参数和操作系统级别的参数
- Broker 端参数
- 首先 Broker 是需要配置存储信息的,即 Broker 使用哪些 磁盘。那么针对存储信息的重要参数有以下这么几个:
- **log.dirs**: 这是非常重要的参数,指定了 Broker 需要 使用的若干个文件目录路径。要知道这个参数是没有默认 值的,这说明什么?这说明它**必须由你亲自指定**。
- log.dir: 注意这是 dir结尾没有 s说明它只能表示 单个路径,它是**补充上一个参数用的**。
- 这两个参数应该怎么设置呢?很简单,你只要设置 log.dirs即第一个参数就好了不要设置log.dir。而且更重要的是在线上生产环境中一定要为log.dirs配置 多个路径,
具体格式是一个 CSV 格式,也就是用逗号分隔 的多个路径,比 如/home/kafka1,/home/kafka2,/home/kafka3这 样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。
- 这样做有两个好处:
- 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
- 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新 引入的强大功能。要知道在以前,只要 Kafka Broker 使 用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。 但是自 1.1 开始,这种情况被修正了,
坏掉的磁盘上的数 据会自动地转移到其他正常的磁盘上,而且 Broker 还能 正常工作。这个改进正是我们舍弃 RAID 方案的基 础:没有这种 Failover 的话,我们只能依靠 RAID 来提 供保障。
- 与 ZooKeeper 相关的设置
- 首先 ZooKeeper 是 做什么的呢?它是一个分布式协调框架,负责协调管理并保 存 Kafka 集群的所有元数据信息比如集群都有哪些Broker 在运行、创建了哪些 Topic每个 Topic 都有多少 分区以及这些分区的 Leader 副本都在哪些机器上等信息。
- Kafka 与 ZooKeeper 相关的最重要的参数当属 **zookeeper.connect**
- 这也是一个 CSV 格式的参数,比 如我可以指定它的值为 **zk1:2181,zk2:2181,zk3:2181**。2181 是 ZooKeeper 的默认端口。
- 如果我让**多个 Kafka 集群使用同一套 ZooKeeper 集群**,那么这个参数应该怎么设置呢?这时候 chroot 就派上用场了。这个 chroot 是 ZooKeeper 的概 念,类似于别名。
- 如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2那么两套集群的zookeeper.connect参数可以 这样指定:**zk1:2181,zk2:2181,zk3:2181/kafka1和 zk1:2181,zk2:2181,zk3:2181/kafka2**。
- **切记 chroot 只需要写一次,而且是加到最后的**。我经常碰到有人这样指 定: zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3这样的格式是不对的。
- 第三组参数是与 Broker 连接相关的,即客户端程序或其他 Broker 如何与该 Broker 进行通信的设置。有以下三个参 数:
- **listeners**: 学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。
- advertised.listeners: 和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的, 就是说**这组监听器是 Broker 用于对外发布的**。
- [内网环境访问Kafka不需要配置这个参数常见的玩法是:你的Kafka Broker机器上配置了双网卡一 块网卡用于内网访问(即我们常说的内网IP);另一个块用 于外网访问。那么你可以配置listeners为内网IP advertised.listeners为外网IP。]
- ~~host.name/port~~: 列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。
- 监听器的概念,从构成上来说,它是若干个逗 号分隔的三元组,**每个三元组的格式为<协议名称,主机 名,端口号>**。这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密 传输等;
也可能是你自己定义的协议名字,比如 CONTROLLER: //localhost:9092。
- 一旦你自己定义了协议名称,你必须还要指定 listener.security.protocol.map参数告诉这个协议底层使用了哪种安全协议比如指定 listener.security.protocol.map=CONTROLLER:P LAINTEXT表示CONTROLLER这个自定义协议底层使用明文 不加密传输数据。
- 经常有人会问主机 名这个设置中我到底使用 IP 地址还是主机名。这里我给出 统一的建议:**最好全部使用主机名**,即 **Broker 端和 Client 端应用配置中全部填写主机名**。 Broker 源代码中也使用的 是主机名,如果你在某些地方使用了 IP 地址进行连接,可能会发生无法连接的问题。
- 第四组参数是关于 Topic 管理的。我来讲讲下面这三个参数:
- auto.create.topics.enable:是否允许自动创建 Topic。
- [建议最好设置成 **false**,即不允许自动创建 Topic。每个部门被分配的 Topic 应该由运维严格把控,决不能允许自行创建任何 Topic。]
- unclean.leader.election.enable:是否允许 Unclean Leader 选举。
- [每个分区都有多个副本来提供高可用。 在这些副本中只能有一个副本对外提供服务,即所谓的 Leader 副本。建议你还是显式地把它设置成 **false** 吧]
- auto.leader.rebalance.enable:是否允许定期进 行 Leader 选举。
- [对生产环境影响非常大,设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满 足一定的条件才会发生。严格来说它与上一个参数中Leader 选举的最大不同在于,
它不是选 Leader而是换 Leader!比如 Leader A 一直表现得很好,但若 true则有可能 一段时间后 Leader A 就要被强行卸任换成 Leader B。建议你在生产环境中把这 个参数设置成 **false**。]
- 最后一组参数是数据留存方面的,即:
- log.retention.{hour|minutes|ms}:这是个“三 兄弟”,都是控制一条消息数据被保存多长时间。从优先 级上来说 ms 设置最高、minutes 次之、hour 最低。
- [虽然 ms 设置有最高的优先级,但是 通常情况下我们还是设置 hour 级别的多一些,比如 log.retention.hour=168表示默认保存 7 天的数据,自动删除 7 天前的数据。很多公司把 Kafka 当做存储来使 用,那么这个值就要相应地调大。]
- log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。
- [这个值默认是 -1 表明你想在这台 Broker 上保存多少数据都可以,至少在容 量方面 Broker 绝对为你开绿灯,不会做任何阻拦。这个参 数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集 群:设想你要做一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间]
- message.max.bytes:控制 Broker 能够接收的最大消息大小。
- [不能使用默认值的参数,默认的 1000012 太少了,还不到 1MB。实际场景中突 破 1MB 的消息都是屡见不鲜的,因此在线上环境中设置一 个比较大的值还是比较保险的做法。毕竟它只是一个标尺而 已,仅仅衡量 Broker 能够处理的最大消息大小,即使设置 大一点也不会耗费什么磁盘空间的。]
- 补充参数
- gg.handler.kafkahandler.Mode = tx gg.handler.kafkahandler.Mode = op 这两个的差别。遇到时 dml 数据会丢失的情况。用的是 op 。
- 当设置成op单个数据库表的变更(插入、更新、删除) 会被当成一条Kafka消息发送;当设置成tx时数据库事务 所做的所有变更统一被封装进一条Kafka消息并在事务提 交后被发送。后者有事务性的保障,至少有原子性方面的保证,不会丢失部分 CDC 数据。
- Topic 级别参数
- Topic 级别参数会**覆盖全局 Broker 参数的值**,而每个 Topic 都能设置自己的参数值, 这就是所谓的 Topic 级别参数。
- **retention.ms**: 规定了该 Topic 消息被保存的时长。 默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦 设置了这个值,它会覆盖掉 Broker 端的全局参数值。
- **retention.bytes**:规定了要为该 Topic 预留多大的磁 盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1表示可以 无限使用磁盘空间。
- Topic 级别 参数的设置就是这种情况,我们有两种方式可以设置:
- 创建 Topic 时进行设置
- 修改 Topic 时设置
- 如何在创建 Topic 时设置这些参数
- 你的部门需要将交易数据发送到 Kafka 进行处理,需要保 存最近半年的交易数据,同时这些数据很大,通常都有几 MB但一般不会超过 5MB。现在让我们用以下命令来创建 Topic:
- bin/kafka-topics.sh--bootstrap-serverlocalhost:9092--create--topictransaction--partitions1--replication-factor1--configretention.ms=15552000000--configmax.message.bytes=5242880
- 请注意结尾处的--config设置我们就是在 config 后面指定了想要设置的 Topic 级别参数。
- 使用另一个自带的命令kafka-configs来修改 Topic 级别参数。假设我们现在要发送最大值是 10MB 的消息,该如何修改呢?命令如下:
- bin/kafka-configs.sh--zookeeperlocalhost:2181--entity-typetopics--entity-nametransaction--alter--add-configmax.message.bytes=10485760
- [你最好始终坚持使用第二种方式来设置并且在未来Kafka 社区很有可能统一使用kafka-configs脚本来调整 Topic 级别参数。]
- JVM 参数
- Kafka 服务器端代码是用 Scala 语言编写的,但终归还是编译成 Class 文件在 JVM 上运行,因此 JVM 参数设置对于 Kafka 集群的重要性不言而喻。
- 有条件的话至少使用 Java 8 吧, [Kafka 2.0已经不支持Java 7了2.1版本开始初步支持Java 11但不建议生产环境用11所以还是使用Java 8吧。]
- 堆大小这个参数至关重要。无脑给出一个通用的建议:将你的 JVM 堆大小设置成 **6GB** 吧,这是目前业界比较公认的一个合理值。我见过很多人就是使用默认的 Heap Size 来跑 Kafka说实话默认的 1GB 有点小,
毕竟 Kafka Broker 在与客户端进行交互时会在 JVM 堆上创建大量的 ByteBuffer 实例Heap Size 不能太小。
- [没有通用的标准只有一个最佳实践值6GB。最好还是监控一下实时的堆大小特别是**GC之后的live data大小通常将heapsize设置成其1.5~2倍就足以了**]
- JVM 端配置的另一个重要参数就是垃圾回收器的设置,也就是平时常说的 GC 设置。如果你依然在使用 Java 7那么可以根据以下法则选择合适的垃圾回收器
- 如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。
- 否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。
- 当然了,如果你已经在使用 Java 8 了[G1是jdk9中默认的jdk8还是需要显式指定的]。在没有任何调优的情况下G1 表现得要比 CMS 出色,主要体现在更少的 Full GC需要调整的参数更少等所以使用 G1 就好了。
- Java8默认的新生代垃圾回收器是UseParallelGC可以用-XX:+PrintCommandLineFlags -version查看如果显示指定 -XX:+UseCurrentMarkSweepGC 的话,会默认开启 -XX:+UseParallelGC
- 确定好了要设置的 JVM 参数,我们该如何为 Kafka 进行设置呢?
- 只需要设置下面这两个环境变量即可:
- KAFKA_HEAP_OPTS指定堆大小。
- KAFKA_JVM_PERFORMANCE_OPTS指定 GC 参数。
- 比如你可以这样启动 Kafka Broker即在启动 Kafka Broker 之前,先设置上这两个环境变量:
- $> **export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g**
- $> **export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true**
- $> **bin/kafka-server-start.sh config/server.properties**
- 操作系统参数
- Kafka 并不需要设置太多的 OS 参数,但有些因素最好还是关注一下,比如下面这几个:
- 文件描述符限制
- 文件系统类型
- Swappiness
- 提交时间
- 首先是 **ulimit -n**
- 觉得任何一个 Java 项目最好都调整下这个值。实际上,文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如 **ulimit -n 1000000**
- 但不设置的话后果很严重比如你会经常看到“Too many open files”的错误。
- 其次是文件系统类型的选择。
- 这里所说的文件系统指的是如 ext3、ext4 或 XFS 这样的日志型文件系统。根据官网的测试报告,**XFS** 的性能要强于 ext4所以生产环境最好还是使用 XFS。对了最近有个 Kafka 使用 **ZFS** 的数据报告,貌似性能更加强劲,有条件的话不妨一试。
- 第三是 swap 的调优
- 网上很多文章都提到设置其为 0将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。我个人反倒觉得还是不要设置成 0 比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成 0当物理内存耗尽时操作系统会触发 OOM killer 这个组件,
它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,
我**个人建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1**。
### 1.5 kafka的基本操作
```shell
# 1、启动Kafka - todo 这个命令会让控制台有日志生成不太方便
bin/kafka-server-start.sh config/server.properties &
# 2、停止Kafka
bin/kafka-server-stop.sh
# 3、创建Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic jiangzh-topic
# 4、查看已经创建的Topic信息
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 5、发送消息
bin/kafka-console-producer.sh --broker-list 192.168.220.128:9092 --topic jiangzh-topic
# 6、接收消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.220.128:9092 --topic jiangzh-topic --from-beginning
# {"orderId":"002","price":"80"}
```
```shell
# 【旧版】
kafka-topics.sh --zookeeper 172.16.26.183:2181 --list
# 【新版】
kafka-topics.sh --bootstrap-server 172.16.26.183:9092 --list
# zookeeper is not a recognized option主要原因是 Kafka 版本过高,命令不存在)
# 创建topic主题
kafka-topics.sh --bootstrap-server 172.16.26.183:9092 --create --topic topic1 --partitions 1 --replication-factor 3
# --create 命令后 --topic 为创建topic 并指定 topic name
# --partitions 为指定分区数量
# --replication-factor 为指定副本集数量
# 向kafka集群发送数据
# 【无key型消息】
kafka-console-producer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1
# 【有key型消息】
kafka-console-producer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 --property parse.key=true
# 默认消息键与消息值间使用“Tab键”进行分隔切勿使用转义字符(\t)
# kafka命令接受数据
kafka-console-consumer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 --from-beginning
# kafka查看消费进度当我们需要查看一个消费者组的消费进度时则使用下面的命令(启动Consumer的时候才会真的生效)【这条命令对查询消费特别重要】
kafka-consumer-groups.sh --bootstrap-server 172.16.26.183:9092 --describe --group group1
```
## 2. 客户端实践及原理剖析
### 2.1 生产者消息分区机制原理剖析
- 为什么分区?
- Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息
- 主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。
- 为什么使用分区的概念而不是直接使用多个主题呢?
- 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了**实现系统的高伸缩性Scalability**
- 不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能**独立地执行各自分区的读写请求处理**。
- 并且,我们还可以通过**添加新的节点机器来增加整体系统的吞吐量**。
- 实际上分区的概念以及分区数据库早在 1980 年就已经有大牛们在做了,比如那时候有个叫 Teradata 的数据库就引入了**分区的概念**。
- Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard而在 HBase 中则叫 Region在 Cassandra 中又被称作 vnode
- 除了提供负载均衡这种最核心的功能之外,利用分区也可以实现其他一些业务级别的需求,比如**实现业务级别的消息顺序的问题**
- 都有哪些分区策略?
- 所谓分区策略是决定生产者将消息发送到哪个分区的算法。
- 如果要自定义分区策略你需要显式地配置生产者端的参数partitioner.class
- 在编写生产者程序时你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。
- 这个接口也很简单只定义了两个方法partition()和close(),通常你只需要实现最重要的 partition 方法。
- 方法签名int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
- 这里的topic、key、keyBytes、value和valueBytes都属于消息数据cluster则是集群信息比如当前 Kafka 集群共有多少主题、多少 Broker 等)
- Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。
- 只要你自己的**实现类定义好了 partition 方法同时设置partitioner.class参数为你自己实现类的 Full Qualified Name**,那么生产者程序就会按照你的代码逻辑对消息进行分区。
- 比较常见的分区策略
- **轮询策略**
- 也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0第二条被发送到分区 1第三条被发送到分区 2以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0就像下面这张图展示的那样。
- ![轮询策略](pic/轮询策略.png)
- 这就是所谓的轮询策略。轮询策略是 Kafka Java 生产者 API **默认提供的分区策略**。如果你未指定partitioner.class参数那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。
- [轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。]
- **随机策略**
- 也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示。
- ![随机策略](pic/随机策略.png)
- 如果要实现随机策略版的 partition 方法,很简单,只需要两行代码即可:
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- return ThreadLocalRandom.current().nextInt(partitions.size());
- 先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。
- 本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以**如果追求数据的均匀分布,还是使用轮询策略比较好**。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。
- **按消息键保序策略**
- 也称 Key-ordering 策略。这个名词是我自己编的Kafka 官网上并无这样的提法。
- Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。
特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,
**由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略**,如下图所示。
- ![按消息键保序策略](pic/按消息键保序策略.png)
- 实现这个策略的 partition 方法同样简单,只需要下面两行代码即可:
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- return Math.abs(key.hashCode()) % partitions.size();
- 前面提到的 Kafka 默认分区策略实际上同时实现了两种策略:**如果指定了 Key那么默认实现按消息键保序策略如果没有指定 Key则使用轮询策略。**
- 如何实现消息的顺序问题?
- 企业发送的 Kafka 的消息是有因果关系的,故处理因果关系也必须要保证有序性,否则先处理了“果”后处理“因”必然造成业务上的混乱。
- 当时企业的做法是给 Kafka 主题设置单分区,也就是 1 个分区。这样所有的消息都只在这一个分区内读写,因此保证了全局的顺序性。
这样做虽然实现了因果关系的顺序性,但也丧失了 Kafka 多分区带来的高吞吐量和负载均衡的优势。
- 后来经过了解和调研,我发现这种具有因果关系的消息都有一定的特点,比如在**消息体中都封装了固定的标志位**,改进建议他们**对此标志位设定专门的分区策略,保证同一标志位的所有消息都发送到同一分区**
这样既可以保证分区内的消息顺序,也可以享受到多分区带来的性能红利。
- 这种基于个别字段的分区策略本质上就是按消息键保序的思想,其实更加合适的做法是把**标志位数据提取出来统一放到 Key 中,这样更加符合 Kafka 的设计思想**。
- 其他分区策略
- 其实还有一种比较常见的,即所谓的**基于地理位置的分区策略**。当然这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群。
- 假设的所有服务都部署在北京的一个机房(这里我假设它是自建机房,不考虑公有云方案。其实即使是公有云,实现逻辑也差不多),现在考虑在南方找个城市(比如广州)再创建一个机房;
另外从两个机房中选取一部分机器共同组成一个大的 Kafka 集群。显然,这个集群中必然有一部分机器在北京,另外一部分机器在广州。
- 计划为每个新注册用户提供一份注册礼品,比如南方的用户可以免费得到一碗“甜豆腐脑”,而北方的新注册用户可以得到一碗“咸豆腐脑”。
如果用 Kafka 来实现则很简单,只需要创建一个双分区的主题,然后再创建两个消费者程序分别处理南北方注册用户逻辑即可。
- 但问题是你需要把南北方注册用户的注册消息正确地发送到位于南北方的不同机房中,因为处理这些消息的消费者程序只可能在某一个机房中启动着。
换句话说,送甜豆腐脑的消费者程序只在广州机房启动着,而送咸豆腐脑的程序只在北京的机房中,如果你向广州机房中的 Broker 发送北方注册用户的消息,那么这个用户将无法得到礼品!
- 此时我们就可以根据 Broker 所在的 IP 地址实现定制化的分区策略。比如下面这段代码:
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
- 我们可以从所有分区中找出那些 Leader 副本在南方的所有分区,然后随机挑选一个进行消息发送。
- 补充问题
- 在消息重试的时候分区策略会重新再计算一次吗比如一开始选择到5号分区但是5号分区有问题导致重试重试的时候可以重试发送到别的分区上吗
- 不会的。消息重试只是简单地将消息重新发送到之前的分区
- 在看kafka-client生产者默认分区源码时看到了cluster.partitionsForTopic和cluster.availablePartitionsForTopic请问什么时候分区是available什么时候是不unavailable的
- 分区没有leader的时候就是unavailable了。某些操作会导致瞬间没有leader比如分区reassign、换leader等
- RocketMQ与Kafka的主要区别
- Kafka吞吐量大多是面向大数据场景。RocketMQ吞吐量也很强 不过它号称是金融业务级的消息中间件,也就是说可以用于实际的业务系统;
- RocketMQ毕竟是阿里出品在国内技术支持力度要比Kafka强
- Kafka现在主要发力StreamingRocketMQ在流处理这块表现如何我不太清楚至少streaming不是它现阶段的主要卖点。
- 其他方面这两者确实都差不多
- 广州机房怎么消费广州partition的数据consumer如何指定消费的partition。
- 使用这个方法consumer.assign()直接消息指定分区
### 2.2 生产者压缩算法面面观
- 说起压缩compression我相信你一定不会感到陌生。它秉承了**用时间去换空间的经典 trade-off 思想**,具体来说就是用 CPU 时间去换磁盘空间或网络 I/O 传输量,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。在 Kafka 中,压缩也是用来做这件事的。
- 怎么压缩?
- Kafka 是如何压缩消息的呢?要弄清楚这个问题,就要从 Kafka 的消息格式说起了。目前 **Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本**。V2 版本是 Kafka 0.11.0.0 中正式引入的。
- 不论是哪个版本Kafka 的消息层次都分为两层:**消息集合message set以及消息message**。**一个消息集合中包含若干条日志项record item而日志项才是真正封装消息的地方**。
Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka **通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作**。
- V2 版本主要是针对 V1 版本的一些弊端做了修正
- 一个,就是把消息的公共部分抽取出来放到外层消息集合里面,这样就不用每条消息都保存这些信息了。
- 在 V2 版本中,消息的 CRC 校验工作就被移到了消息集合这一层。
- V2 版本还有一个和压缩息息相关的改进,就是保存压缩消息的方法发生了变化。
- 之前 V1 版本中保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息体字段中;而 V2 版本的做法是对整个消息集合进行压缩。显然后者应该比前者有更好的压缩效果。
- V2 版本都比 V1 版本节省磁盘空间。
- 何时压缩?
- 在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。
- 生产者程序中配置 compression.type 参数即表示启用指定类型的压缩算法。
- 下面这段程序代码展示了如何构建一个开启 GZIP 的 Producer 对象:
```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启 GZIP 压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
```
- 这里比较关键的代码行是 props.put(“compression.type”, “gzip”),它表明该 Producer 的压缩算法使用的是 GZIP。这样 Producer 启动后生产的每个消息集合都是经 GZIP 压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。
- 在生产者端启用压缩是很自然的想法,那为什么我说在 Broker 端也可能进行压缩呢?其实大部分情况下 **Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改**,但这里的“大部分情况”也是要满足一定条件的。
- 有两种例外情况就可能让 Broker 重新压缩消息。
- 情况一Broker 端指定了和 Producer 端不同的压缩算法。
- Producer 说:“我要使用 GZIP 进行压缩。”
- Broker 说:“不好意思,我这边接收的消息必须使用 Snappy 算法进行压缩。”
- Kafka 官网,你会发现 Broker 端也有一个参数叫 compression.type和上面那个例子中的同名。但是这个**参数的默认值是 producer**,这表示 Broker 端会“尊重”Producer 端使用的压缩算法。
可一旦你在 Broker 端设置了不同的 compression.type 值,就一定要小心了,因为可能会发生预料之外的压缩 / 解压缩操作,**通常表现为 Broker 端 CPU 使用率飙升**。
- 情况二Broker 端发生了消息格式转换。
- 在一个生产环境中Kafka 集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,
它还让 Kafka 丧失了引以为豪的 Zero Copy 特性。
- 所以尽量保证消息格式的统一吧,这样不仅可以避免不必要的解压缩 / 重新压缩,对提升其他方面的性能也大有裨益。如果有兴趣你可以深入地了解下 Zero Copy 的原理。
- 何时解压缩?
- 通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到 Broker 后Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时Broker 依然原样发送出去,**当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息**。
- Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。**Kafka 会将启用了哪种压缩算法封装进消息集合中**,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。
- 住这句话:**Producer 端压缩、Broker 端保持、Consumer 端解压缩。**
- 除了在 Consumer 端解压缩Broker 端也会进行解压缩。注意了,这和前面提到消息格式转换时发生的解压缩是不同的场景。每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,**目的就是为了对消息执行各种验证**。
我们必须承认**这种解压缩对 Broker 端性能是有一定影响的,特别是对 CPU 的使用率而言**。
- 国内京东的小伙伴们刚刚向社区提出了一个 bugfix建议去掉因为做消息校验而引入的解压缩。据他们称**去掉了解压缩之后Broker 端的 CPU 使用率至少降低了 50%**。
- 目前社区并未采纳这个建议,原因就是这种消息校验是非常重要的,不可盲目去之。毕竟先把事情做对是最重要的,**在做对的基础上,再考虑把事情做好做快**。
- 你也可以思考一下,是否有一个两全其美的方案,既能避免消息解压缩也能对消息执行校验。
- 各种压缩算法对比
- 在 Kafka 2.1.0 版本之前Kafka 支持 3 种压缩算法:**GZIP、Snappy 和 LZ4**。从 2.1.0 开始Kafka 正式支持 **Zstandard** 算法(简写为 zstd。它是 Facebook 开源的一个压缩算法能够提供超高的压缩比compression ratio
- 对了,看一个压缩算法的优劣,有两个重要的指标:
- 一个指标是**压缩比**,原先占 100 份空间的东西经压缩之后变成了占 20 份空间,那么压缩比就是 5显然压缩比越高越好
- 另一个指标就是**压缩 / 解压缩吞吐量**,比如每秒能压缩或解压缩多少 MB 的数据。同样地,吞吐量也是越高越好。
- Facebook Zstandard 官网提供的一份压缩算法 benchmark 比较结果:
- ![Zstandard压缩算法对比](pic/Zstandard压缩算法对比.png)
- zstd 算法有着最高的压缩比,而在吞吐量上的表现只能说中规中矩。
- 反观 LZ4 算法,它在吞吐量方面则是毫无疑问的执牛耳者。
- 在实际使用中GZIP、Snappy、LZ4 甚至是 zstd 的表现各有千秋。
- 但对于 Kafka 而言,它们的性能测试结果却出奇得一致,
- 即**在吞吐量方面LZ4 > Snappy > zstd 和 GZIP**
- 而**在压缩比方面zstd > LZ4 > GZIP > Snappy**。
- 具体到**物理资源**,使用 **Snappy 算法占用的网络带宽最多zstd 最少**,这是合理的,毕竟 zstd 就是要提供超高的压缩比;
-**CPU 使用率**方面,各个算法表现得差不多,**只是在压缩时 Snappy 算法使用的 CPU 较多一些**
- 而**在解压缩时 GZIP 算法则可能使用更多的 CPU**。
- 最佳实践
- 首先来说压缩。何时启用压缩是比较合适的时机呢?
- 你现在已经知道 Producer 端完成的压缩,那么启用压缩的一个条件就是 **Producer 程序运行机器上的 CPU 资源要很充足**。如果 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反。
- 除了 CPU 资源充足这一条件,如果你的环境中带宽资源有限,那么我也建议你开启压缩。
- 事实上我见过的很多 Kafka 生产环境都遭遇过带宽被打满的情况。这年头,**带宽可是比 CPU 和内存还要珍贵的稀缺资源**,毕竟万兆网络还不是普通公司的标配,因此千兆网络中 Kafka 集群带宽资源耗尽这件事情就特别容易出现。
- **如果你的客户端机器 CPU 资源有很多富余,我强烈建议你开启 zstd 压缩,这样能极大地节省网络资源消耗**。
- 其次说说解压缩。其实也没什么可说的。
- 一旦启用压缩,解压缩是不可避免的事情。
- 这里只想强调一点:我们对不可抗拒的解压缩无能为力,但至少能规避掉那些意料之外的解压缩。就像我前面说的,因为要兼容老版本而引入的解压缩操作就属于这类。有条件的话尽量保证不要出现消息格式转换的情况。
- 补充问题
- 正常情况下broker端会原样保存起来但是为了检验需要解压缩。该怎么去理解这个过程呢broker端解压缩以后还会压缩还原吗
- 它只是解压缩读取而已,不会将解压缩之后的数据回写到磁盘。另外就像我置顶的留言那样,目前社区已经接纳了京东小伙伴的修改,貌似可以绕过这部分解压缩了.
- 规避了broker端为执行校验而做的解压缩操作代码也merge进了2.4版本。有兴趣的同学可以看一下:
- https://issues.apache.org/jira/browse/KAFKA-8106
- 消息层次、消息集合、消息、日志项这些概念与它们之间的关系
- 消息批次RecordBatch里面包含若干条消息record)。 你可以认为消息批次和消息集合是等价的,消息和日志项是等价的。这样消息层次有两层:外层是消息批次(或消息集合);里层是消息(或日志项)。
Producer以recordbatch为单位发送消息对于V2版本一个batch中通常包含多条消息。在V2版本中在batch层面计算CRC值在V1版本中每条消息都要计算CRC值。
## 3. Producer生产者
## 4. Consumer
## 5. Stream
## 6. Connect
## 7. Kafka集群部署与开发
## 8. Kafka集群监控、安全与最佳实践
## 9. Kafka面试点梳理