[代码重构](master): 揭开神秘的“位移主题”面纱

2022年9月19日17:05:36
master
土豆兄弟 2 years ago
parent b4be3bb5e5
commit d931733337

@ -757,13 +757,68 @@ try {
- 0.9版本引入了新版本consumer。新版本将位移保存在__consumer_offsets中不需要额外配置。如果你想保存在外部的Zk集群中那么设置consumer端参数enable.auto.commit=false然后自己调用ZooKeeper API提交位移到Zk即可。 - 0.9版本引入了新版本consumer。新版本将位移保存在__consumer_offsets中不需要额外配置。如果你想保存在外部的Zk集群中那么设置consumer端参数enable.auto.commit=false然后自己调用ZooKeeper API提交位移到Zk即可。
### 2.8 揭开神秘的“位移主题”面纱 ### 2.8 揭开神秘的“位移主题”面纱
- Kafka 中神秘的内部主题Internal Topic__consumer_offsets。
- __consumer_offsets 在 Kafka 源码中有个更为正式的名字,叫位移主题,即 Offsets Topic。为了方便今天的讨论我将统一使用位移主题来指代 __consumer_offsets。需要注意的是它有两个下划线哦。
- 新版本 Consumer 的位移管理机制其实也很简单,就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。可以这么说,**__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息**。
- 它要求这个提交过程不仅要实现高持久性还要支持高频的写操作。显然Kafka 的主题设计天然就满足这两个条件,因此,使用 Kafka 主题来保存位移这件事情,实际上就是一个水到渠成的想法了。
- 位移主题就是普通的 Kafka 主题。你可以手动地创建它、修改它,甚至是删除它。只不过,它同时也是一个内部主题,大部分情况下,你其实并不需要“搭理”它,也不用花心思去管理它,把它丢给 Kafka 就完事了。
- 虽说位移主题是一个普通的 Kafka 主题,但它的**消息格式却是 Kafka 自己定义的** 用户不能修改,也就是说你不能随意地向这个主题写消息,因为一旦你写入的消息不满足 Kafka 规定的格式,那么 Kafka 内部无法成功解析,就会造成 Broker 的崩溃。
- 事实上Kafka Consumer 有 API 帮你提交位移,也就是向位移主题写消息。你千万不要自己写个 Producer 随意向该主题发送消息。
- 首先从 Key 说起。
- 一个 Kafka 集群中的 Consumer 数量会有很多,既然这个主题保存的是 Consumer 的位移数据,那么消息格式中必须要有字段来标识这个位移数据是哪个 Consumer 的。这种数据放在哪个字段比较合适呢?显然放在 Key 中比较合适。
- Group ID ,它能够标识唯一的 Consumer Group。
- 除了 Consumer GroupKafka 还支持独立 Consumer也称 Standalone Consumer。它的运行机制与 Consumer Group 完全不同,但是位移管理的机制却是相同的。因此,即使是 Standalone Consumer也有自己的 Group ID 来标识它自己,所以也适用于这套消息格式。
- Key 中保存了 Group ID但是只保存 Group ID 就可以了吗别忘了Consumer 提交位移是在分区层面上进行的即它提交的是某个或某些分区的位移那么很显然Key 中还应该保存 Consumer 要提交位移的分区。
- 结论
- 位移主题的 Key 中应该保存 3 部分内容:<Group ID >
- 消息体的设计
- 也许你会觉得消息体应该很简单,保存一个位移值就可以了。实际上,社区的方案要复杂得多,比如消息体还保存了位移提交的一些其他元数据,诸如时间戳和用户自定义的数据等。
- 保存这些元数据是为了帮助 Kafka 执行各种各样后续的操作,比如删除过期位移消息等。但总体来说,我们还是可以简单地认为消息体就是保存了位移值。
- 位移主题的消息格式可不是只有这一种。事实上,它有 3 种消息格式。除了刚刚我们说的这种格式,还有 2 种格式:
- 用于保存 Consumer Group 信息的消息。
- 用于删除 Group 过期位移甚至是删除 Group 的消息。
- 第 1 种格式非常神秘,以至于你几乎无法在搜索引擎中搜到它的身影。不过,你只需要记住它是用来注册 Consumer Group 的就可以了。
- 第 2 种格式相对更加有名一些。它有个专属的名字tombstone 消息,即墓碑消息,也称 delete mark。下次你在 Google 或百度中见到这些词,不用感到惊讶,它们指的是一个东西。这些消息只出现在源码中而不暴露给你。它的主要特点是它的消息体是 null即空消息体。
- 何时会写入这类消息呢?
- 一旦某个 Consumer Group 下的所有 Consumer 实例都停止了而且它们的位移数据都已被删除时Kafka 会向位移主题的对应分区写入 tombstone 消息,表明要彻底删除这个 Group 的信息。
- 位移主题是怎么被创建的。
- 通常来说,当 Kafka 集群中的第一个 Consumer 程序启动时Kafka 会自动创建位移主题。我们说过,位移主题就是普通的 Kafka 主题,那么它自然也有对应的分区数。
但如果是 Kafka 自动创建的,分区数是怎么设置的呢?这就要看 Broker 端参数 offsets.topic.num.partitions 的取值了。它的默认值是 50因此 Kafka 会自动创建一个 50 分区的位移主题。
如果你曾经惊讶于 Kafka 日志路径下冒出很多 __consumer_offsets-xxx 这样的目录,那么现在应该明白了吧,这就是 Kafka 自动帮你创建的位移主题啊。
- 位移主题是怎么被创建的。
- 通常来说,当 Kafka 集群中的第一个 Consumer 程序启动时Kafka 会自动创建位移主题。我们说过,位移主题就是普通的 Kafka 主题,那么它自然也有对应的分区数。但如果是 Kafka 自动创建的,分区数是怎么设置的呢?
这就要看 Broker 端参数 offsets.topic.num.partitions 的取值了。它的默认值是 50因此 Kafka 会自动创建一个 50 分区的位移主题。如果你曾经惊讶于 Kafka 日志路径下冒出很多 __consumer_offsets-xxx 这样的目录,
那么现在应该明白了吧,这就是 Kafka 自动帮你创建的位移主题啊。
- **如果位移主题是 Kafka 自动创建的,那么该主题的分区数是 50副本数是 3**。
- 你也可以选择手动创建位移主题。
- 具体方法就是,在 Kafka 集群尚未启动任何 Consumer 之前,使用 Kafka API 创建它。手动创建的好处在于,你可以创建满足你实际场景需要的位移主题。比如很多人说 50 个分区对我来讲太多了,我不想要这么多分区,那么你可以自己创建它,不用理会 offsets.topic.num.partitions 的值。
- **让 Kafka 自动创建比较好。目前 Kafka 源码中有一些地方硬编码了 50 分区数,因此如果你自行创建了一个不同于默认分区数的位移主题,可能会碰到各种各种奇怪的问题。**
- 目前 Kafka Consumer 提交位移的方式有两种:自动提交位移和手动提交位移。
- Consumer 端有个参数叫 enable.auto.commit如果值是 true则 Consumer 在后台默默地为你定期提交位移,提交间隔由一个专属的参数 auto.commit.interval.ms 来控制。自动提交位移有一个显著的优点,就是省事,你不用操心位移提交的事情,就能保证消息消费不会丢失。
但这一点同时也是缺点。因为它太省事了,以至于丧失了很大的灵活性和可控性,你完全没法把控 Consumer 端的位移管理。
- 很多与 Kafka 集成的大数据框架都是禁用自动提交位移的,如 Spark、Flink 等。这就引出了另一种位移提交方式:手动提交位移,即设置 enable.auto.commit = false。一旦设置了 false作为 Consumer 应用开发的你就要承担起位移提交的责任。
Kafka Consumer API 为你提供了位移提交的方法,如 consumer.commitSync 等。当调用这些方法时Kafka 会向位移主题写入相应的消息。
- 如果你选择的是自动提交位移,那么就可能存在一个问题:只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息。
- Kafka 是怎么删除位移主题中的过期消息的呢?
- 答案就是 Compaction。国内很多文献都将其翻译成压缩我个人是有一点保留意见的。在英语中压缩的专有术语是 Compression它的原理和 Compaction 很不相同,我更倾向于翻译成压实,或干脆采用 JVM 垃圾回收中的术语:整理。
- Kafka 使用Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义 Compact 策略中的过期呢?对于同一个 Key 的两条消息 M1 和 M2如果 M1 的发送时间早于 M2那么 M1 就是过期消息。
- Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。
- ![ Compact 过程]( pic/Compact过程.png)
- 图中位移为 0、2 和 3 的消息的 Key 都是 K1。Compact 之后,分区只需要保存位移为 3 的消息,因为它是最新发送的。
- **Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据**
- 这个后台线程叫 Log Cleaner。很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题如果你的环境中也有这个问题我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。
- 补充
- consumer端 日常业务发版呢那每次发版需要重启consumer不是也会导致Rebalance这个如何规避
- 可以考虑使用standalone consumer否则group机制无法避免
- 具体是从哪个版本开始位移数据开始默认的不存在zk而是存在自己内部了 如果验证目前使用的环境位移是存在内部还是zk上 什么场景下适合使用自动提交位移?
- 1.0.9 推荐保存在Kafka内部不在乎重复消费
- offset topic是在coordinator对应的broker上创建且只创建一次是么
- offset topic在整个集群上被创建出来并且只会创建一次
- consumer 是如何从这个位移主题中拿到曾经属于自己组的offset呢
- 首先找到对应的CoordinatorCoordinator保存了这些数据然后consumer向Coordinator发送请求去请求这些数据
### 2.9
## 3. Producer生产者 ## 3. Producer生产者

Binary file not shown.

After

Width:  |  Height:  |  Size: 159 KiB

Loading…
Cancel
Save