[代码重构](master): kafka Java生产者是如何管理TCP连接的?

2022年9月19日13:38:35
master
土豆兄弟 2 years ago
parent b91d7dd6fb
commit 996fb6a09c

@ -576,20 +576,70 @@ public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String
- 创建好生产者和消费者拦截器后,我们按照上面指定的方法分别将它们配置到各自的 Producer 和 Consumer 程序中,这样就能计算消息从 Producer 端到 Consumer 端平均的处理延时了。 - 创建好生产者和消费者拦截器后,我们按照上面指定的方法分别将它们配置到各自的 Producer 和 Consumer 程序中,这样就能计算消息从 Producer 端到 Consumer 端平均的处理延时了。
这种端到端的指标监控能够从全局角度俯察和审视业务运行情况,及时查看业务是否满足端到端的 SLA 目标。 这种端到端的指标监控能够从全局角度俯察和审视业务运行情况,及时查看业务是否满足端到端的 SLA 目标。
### 2.5 ### 2.5 Java生产者是如何管理TCP连接的
- 为何采用 TCP
- 从社区的角度来看,在开发客户端时,人们能够利用 TCP 本身提供的一些高级功能,比如**多路复用请求以及同时轮询多个连接**的能力。
- 所谓的多路复用请求,即 multiplexing request是指将两个或多个数据流合并到底层单一物理连接中的过程。**TCP 的多路复用请求会在一条物理连接上创建若干个虚拟连接,每个虚拟连接负责流转各自对应的数据流**。
其实严格来说,**TCP 并不能多路复用,它只是提供可靠的消息交付语义保证**,比如自动重传丢失的报文。
- 作为一个基于报文的协议TCP 能够被用于多路复用连接场景的前提是,上层的应用协议(比如 HTTP允许发送多条消息。
- 除了 TCP 提供的这些高级功能有可能被 Kafka 客户端的开发人员使用之外,社区还发现,**目前已知的 HTTP 库在很多编程语言中都略显简陋**。
- 基于这两个原因Kafka 社区决定采用 TCP 协议作为所有请求通信的底层协议。
- Kafka 生产者程序概览
- Kafka 的 Java 生产者 API 主要的对象就是 KafkaProducer。通常我们开发一个生产者的步骤有 4 步。
- 第 1 步:构造生产者对象所需的参数对象。
- 第 2 步:利用第 1 步的参数对象,创建 KafkaProducer 对象实例。
- 第 3 步:使用 KafkaProducer 的 send 方法发送消息。
- 第 4 步:调用 KafkaProducer 的 close 方法关闭生产者并释放各种系统资源。
- 代码示例:
```java
Properties props = new Properties ();
props.put("参数 1", "参数 1 的值")
props.put("参数 2", "参数 2 的值")
//
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
producer.send(new ProducerRecord<String, String>(……), callback);
//
}
```
- 这段代码使用了 Java 7 提供的 try-with-resource 特性,所以并没有显式调用 producer.close() 方法。无论是否显式调用 close 方法,所有生产者程序大致都是这个路数。
- 当我们开发一个 Producer 应用时,生产者会向 Kafka 集群中指定的主题Topic发送消息这必然涉及与 Kafka Broker 创建 TCP 连接。那么Kafka 的 Producer 客户端是如何管理这些 TCP 连接的呢?
- 何时创建 TCP 连接?
- 生产者代码是什么时候创建 TCP 连接的。就上面的那段代码而言,可能创建 TCP 连接的地方有两处Producer producer = new KafkaProducer(props) 和 producer.send(msg, callback)。你觉得连向 Broker 端的 TCP 连接会是哪里创建的呢?前者还是后者,抑或是两者都有?
- **在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与 Broker 的连接。**
- 测试环境中的日志来说明这一点:
```txt
[2018-12-09 09:35:45,620] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: -2 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,622] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
[2018-12-09 09:35:45,814] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,815] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
[2018-12-09 09:35:45,828] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9093 (id: -2 rack: null) (org.apache.kafka.clients.NetworkClient:1068)
```
- 如果不调用 send 方法,这个 Producer 都不知道给哪个主题发消息,它又怎么能知道连接哪个 Broker 呢?难不成它会连接 bootstrap.servers 参数指定的所有 Broker 吗?
- 解释一下 bootstrap.servers 参数。它是 Producer 的核心参数之一,指定了这个 Producer 启动时要连接的 Broker 地址。
- 这里的“启动时”,代表的是 Producer 启动时会发起与这些 Broker 的连接。因此,如果你为这个参数指定了 1000 个 Broker 连接信息,那么很遗憾,你的 **Producer 启动时会首先创建与这 1000 个 Broker 的 TCP 连接**
- 在实际使用过程中,我并**不建议把集群中所有的 Broker 信息都配置到 bootstrap.servers 中**,通常你指定 **34 台就足以**了。因为 **Producer 一旦连接到集群中的任一台 Broker就能拿到整个集群的 Broker 信息,故没必要为 bootstrap.servers 指定所有的 Broker**
- 在 KafkaProducer 实例被创建后以及消息被发送前Producer 应用就开始创建与两台 Broker 的 TCP 连接了。
- 你不需要了解 RecordAccumulator 类是做什么的,你只要知道它主要的数据结构是一个 ConcurrentMap<TopicPartition, Deque>。TopicPartition 是 Kafka 用来表示主题分区的 Java 对象,本身是不可变对象。而 RecordAccumulator 代码中用到 Deque 的地方都有锁的保护,所以基本上可以认定 RecordAccumulator 类是线程安全的。
- **TCP 连接是在创建 KafkaProducer 实例时建立的。**
- 它只会在这个时候被创建吗?
- 当然不是TCP 连接还可能在两个地方被创建:一个是在更新元数据后,另一个是在消息发送时。
- 当 Producer 更新了集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。同样地当要发送消息时Producer 发现尚不存在与目标 Broker 的连接,也会创建一个。
- Producer 更新集群元数据信息的两个场景
- 场景一:当 Producer 尝试给一个不存在的主题发送消息时Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。
- 场景二Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000即 5 分钟也就是说不管集群那边是否有变化Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。
- 何时关闭 TCP 连接?
- Producer 端关闭 TCP 连接的方式有两种:**一种是用户主动关闭;一种是 Kafka 自动关闭**。
- 第一种。这里的主动关闭实际上是广义的主动关闭,甚至包括用户调用 kill -9 主动“杀掉”Producer 应用。当然最推荐的方式还是调用 producer.close() 方法来关闭。
- 第二种是 Kafka 帮你关闭,这与 Producer 端参数 connections.max.idle.ms 的值有关。默认情况下该参数值是 9 分钟,即如果在 9 分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。用户可以在 Producer 端设置 connections.max.idle.ms=-1 禁掉这种机制。一旦被设置成 -1TCP 连接将成为永久长连接。
当然这只是软件层面的“长连接”机制,由于 Kafka 创建的这些 Socket 连接都开启了 keepalive因此 keepalive 探活机制还是会遵守的。
- 在第二种方式中TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。

Loading…
Cancel
Save