[文档修改](master): 生产者消息分区机制原理剖析

周六学习 2022年09月17日23:53:36
master
土豆兄弟 2 years ago
parent 74d69ea918
commit 93e15fecfd

@ -1,6 +1,18 @@
# Kafka
## 0. 目录
- Kafka Kafka基本概念及配置
Kafka 客户端实践及原理剖析
Kafka 内核及源码
Kafka 管理和监控
Kafka 流处理
## 1. Kafka基本概念及配置
@ -239,9 +251,95 @@ kafka-consumer-groups.sh --bootstrap-server 172.16.26.183:9092 --describe --grou
```
## 2. 客户端实践及原理剖析
### 2.1 生产者消息分区机制原理剖析
- 为什么分区?
- Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息
- 主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。
- 为什么使用分区的概念而不是直接使用多个主题呢?
- 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了**实现系统的高伸缩性Scalability**
- 不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能**独立地执行各自分区的读写请求处理**。
- 并且,我们还可以通过**添加新的节点机器来增加整体系统的吞吐量**。
- 实际上分区的概念以及分区数据库早在 1980 年就已经有大牛们在做了,比如那时候有个叫 Teradata 的数据库就引入了**分区的概念**。
- Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard而在 HBase 中则叫 Region在 Cassandra 中又被称作 vnode
- 除了提供负载均衡这种最核心的功能之外,利用分区也可以实现其他一些业务级别的需求,比如**实现业务级别的消息顺序的问题**
- 都有哪些分区策略?
- 所谓分区策略是决定生产者将消息发送到哪个分区的算法。
- 如果要自定义分区策略你需要显式地配置生产者端的参数partitioner.class
- 在编写生产者程序时你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。
- 这个接口也很简单只定义了两个方法partition()和close(),通常你只需要实现最重要的 partition 方法。
- 方法签名int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
- 这里的topic、key、keyBytes、value和valueBytes都属于消息数据cluster则是集群信息比如当前 Kafka 集群共有多少主题、多少 Broker 等)
- Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。
- 只要你自己的**实现类定义好了 partition 方法同时设置partitioner.class参数为你自己实现类的 Full Qualified Name**,那么生产者程序就会按照你的代码逻辑对消息进行分区。
- 比较常见的分区策略
- **轮询策略**
- 也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0第二条被发送到分区 1第三条被发送到分区 2以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0就像下面这张图展示的那样。
- ![轮询策略](pic/轮询策略.png)
- 这就是所谓的轮询策略。轮询策略是 Kafka Java 生产者 API **默认提供的分区策略**。如果你未指定partitioner.class参数那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。
- [轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。]
- **随机策略**
- 也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示。
- ![随机策略](pic/随机策略.png)
- 如果要实现随机策略版的 partition 方法,很简单,只需要两行代码即可:
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- return ThreadLocalRandom.current().nextInt(partitions.size());
- 先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。
- 本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以**如果追求数据的均匀分布,还是使用轮询策略比较好**。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。
- **按消息键保序策略**
- 也称 Key-ordering 策略。这个名词是我自己编的Kafka 官网上并无这样的提法。
- Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。
特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,
**由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略**,如下图所示。
- ![按消息键保序策略](pic/按消息键保序策略.png)
- 实现这个策略的 partition 方法同样简单,只需要下面两行代码即可:
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- return Math.abs(key.hashCode()) % partitions.size();
- 前面提到的 Kafka 默认分区策略实际上同时实现了两种策略:**如果指定了 Key那么默认实现按消息键保序策略如果没有指定 Key则使用轮询策略。**
- 如何实现消息的顺序问题?
- 企业发送的 Kafka 的消息是有因果关系的,故处理因果关系也必须要保证有序性,否则先处理了“果”后处理“因”必然造成业务上的混乱。
- 当时企业的做法是给 Kafka 主题设置单分区,也就是 1 个分区。这样所有的消息都只在这一个分区内读写,因此保证了全局的顺序性。
这样做虽然实现了因果关系的顺序性,但也丧失了 Kafka 多分区带来的高吞吐量和负载均衡的优势。
- 后来经过了解和调研,我发现这种具有因果关系的消息都有一定的特点,比如在**消息体中都封装了固定的标志位**,改进建议他们**对此标志位设定专门的分区策略,保证同一标志位的所有消息都发送到同一分区**
这样既可以保证分区内的消息顺序,也可以享受到多分区带来的性能红利。
- 这种基于个别字段的分区策略本质上就是按消息键保序的思想,其实更加合适的做法是把**标志位数据提取出来统一放到 Key 中,这样更加符合 Kafka 的设计思想**。
- 其他分区策略
- 其实还有一种比较常见的,即所谓的**基于地理位置的分区策略**。当然这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群。
- 假设的所有服务都部署在北京的一个机房(这里我假设它是自建机房,不考虑公有云方案。其实即使是公有云,实现逻辑也差不多),现在考虑在南方找个城市(比如广州)再创建一个机房;
另外从两个机房中选取一部分机器共同组成一个大的 Kafka 集群。显然,这个集群中必然有一部分机器在北京,另外一部分机器在广州。
- 计划为每个新注册用户提供一份注册礼品,比如南方的用户可以免费得到一碗“甜豆腐脑”,而北方的新注册用户可以得到一碗“咸豆腐脑”。
如果用 Kafka 来实现则很简单,只需要创建一个双分区的主题,然后再创建两个消费者程序分别处理南北方注册用户逻辑即可。
- 但问题是你需要把南北方注册用户的注册消息正确地发送到位于南北方的不同机房中,因为处理这些消息的消费者程序只可能在某一个机房中启动着。
换句话说,送甜豆腐脑的消费者程序只在广州机房启动着,而送咸豆腐脑的程序只在北京的机房中,如果你向广州机房中的 Broker 发送北方注册用户的消息,那么这个用户将无法得到礼品!
- 此时我们就可以根据 Broker 所在的 IP 地址实现定制化的分区策略。比如下面这段代码:
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
- 我们可以从所有分区中找出那些 Leader 副本在南方的所有分区,然后随机挑选一个进行消息发送。
- 补充问题
- 在消息重试的时候分区策略会重新再计算一次吗比如一开始选择到5号分区但是5号分区有问题导致重试重试的时候可以重试发送到别的分区上吗
- 不会的。消息重试只是简单地将消息重新发送到之前的分区
- 在看kafka-client生产者默认分区源码时看到了cluster.partitionsForTopic和cluster.availablePartitionsForTopic请问什么时候分区是available什么时候是不unavailable的
- 分区没有leader的时候就是unavailable了。某些操作会导致瞬间没有leader比如分区reassign、换leader等
- RocketMQ与Kafka的主要区别
- Kafka吞吐量大多是面向大数据场景。RocketMQ吞吐量也很强 不过它号称是金融业务级的消息中间件,也就是说可以用于实际的业务系统;
- RocketMQ毕竟是阿里出品在国内技术支持力度要比Kafka强
- Kafka现在主要发力StreamingRocketMQ在流处理这块表现如何我不太清楚至少streaming不是它现阶段的主要卖点。
- 其他方面这两者确实都差不多
- 广州机房怎么消费广州partition的数据consumer如何指定消费的partition。
- 使用这个方法consumer.assign()直接消息指定分区
### 2.2 生产者压缩算法面面观
## 2. Kafka客户端操作

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Loading…
Cancel
Save