From 996fb6a09c241703b166a447d155bb1ec3f43111 Mon Sep 17 00:00:00 2001 From: qyx <565485304@qq.com> Date: Mon, 19 Sep 2022 13:38:41 +0800 Subject: [PATCH] =?UTF-8?q?[=E4=BB=A3=E7=A0=81=E9=87=8D=E6=9E=84](master):?= =?UTF-8?q?=20kafka=20Java=E7=94=9F=E4=BA=A7=E8=80=85=E6=98=AF=E5=A6=82?= =?UTF-8?q?=E4=BD=95=E7=AE=A1=E7=90=86TCP=E8=BF=9E=E6=8E=A5=E7=9A=84?= =?UTF-8?q?=EF=BC=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 2022年9月19日13:38:35 --- bigdata/kafka/README.md | 68 +++++++++++++++++++++++++++++++++++------ 1 file changed, 59 insertions(+), 9 deletions(-) diff --git a/bigdata/kafka/README.md b/bigdata/kafka/README.md index 5d30f99..601c841 100644 --- a/bigdata/kafka/README.md +++ b/bigdata/kafka/README.md @@ -576,20 +576,70 @@ public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor producer = new KafkaProducer<>(props)) { + producer.send(new ProducerRecord(……), callback); + // + } +``` +- 这段代码使用了 Java 7 提供的 try-with-resource 特性,所以并没有显式调用 producer.close() 方法。无论是否显式调用 close 方法,所有生产者程序大致都是这个路数。 +- 当我们开发一个 Producer 应用时,生产者会向 Kafka 集群中指定的主题(Topic)发送消息,这必然涉及与 Kafka Broker 创建 TCP 连接。那么,Kafka 的 Producer 客户端是如何管理这些 TCP 连接的呢? +- 何时创建 TCP 连接? + - 生产者代码是什么时候创建 TCP 连接的。就上面的那段代码而言,可能创建 TCP 连接的地方有两处:Producer producer = new KafkaProducer(props) 和 producer.send(msg, callback)。你觉得连向 Broker 端的 TCP 连接会是哪里创建的呢?前者还是后者,抑或是两者都有? + - **在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与 Broker 的连接。** +- 测试环境中的日志来说明这一点: +```txt + [2018-12-09 09:35:45,620] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: -2 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084) +[2018-12-09 09:35:45,622] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914) +[2018-12-09 09:35:45,814] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084) +[2018-12-09 09:35:45,815] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914) +[2018-12-09 09:35:45,828] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9093 (id: -2 rack: null) (org.apache.kafka.clients.NetworkClient:1068) +``` +- 如果不调用 send 方法,这个 Producer 都不知道给哪个主题发消息,它又怎么能知道连接哪个 Broker 呢?难不成它会连接 bootstrap.servers 参数指定的所有 Broker 吗? + - 解释一下 bootstrap.servers 参数。它是 Producer 的核心参数之一,指定了这个 Producer 启动时要连接的 Broker 地址。 + - 这里的“启动时”,代表的是 Producer 启动时会发起与这些 Broker 的连接。因此,如果你为这个参数指定了 1000 个 Broker 连接信息,那么很遗憾,你的 **Producer 启动时会首先创建与这 1000 个 Broker 的 TCP 连接**。 + - 在实际使用过程中,我并**不建议把集群中所有的 Broker 信息都配置到 bootstrap.servers 中**,通常你指定 **3~4 台就足以**了。因为 **Producer 一旦连接到集群中的任一台 Broker,就能拿到整个集群的 Broker 信息,故没必要为 bootstrap.servers 指定所有的 Broker**。 + - 在 KafkaProducer 实例被创建后以及消息被发送前,Producer 应用就开始创建与两台 Broker 的 TCP 连接了。 + - 你不需要了解 RecordAccumulator 类是做什么的,你只要知道它主要的数据结构是一个 ConcurrentMap。TopicPartition 是 Kafka 用来表示主题分区的 Java 对象,本身是不可变对象。而 RecordAccumulator 代码中用到 Deque 的地方都有锁的保护,所以基本上可以认定 RecordAccumulator 类是线程安全的。 +- **TCP 连接是在创建 KafkaProducer 实例时建立的。** +- 它只会在这个时候被创建吗? + - 当然不是!TCP 连接还可能在两个地方被创建:一个是在更新元数据后,另一个是在消息发送时。 + - 当 Producer 更新了集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。同样地,当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,也会创建一个。 +- Producer 更新集群元数据信息的两个场景 + - 场景一:当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。 + - 场景二:Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。 + +- 何时关闭 TCP 连接? + - Producer 端关闭 TCP 连接的方式有两种:**一种是用户主动关闭;一种是 Kafka 自动关闭**。 + - 第一种。这里的主动关闭实际上是广义的主动关闭,甚至包括用户调用 kill -9 主动“杀掉”Producer 应用。当然最推荐的方式还是调用 producer.close() 方法来关闭。 + - 第二种是 Kafka 帮你关闭,这与 Producer 端参数 connections.max.idle.ms 的值有关。默认情况下该参数值是 9 分钟,即如果在 9 分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。用户可以在 Producer 端设置 connections.max.idle.ms=-1 禁掉这种机制。一旦被设置成 -1,TCP 连接将成为永久长连接。 + 当然这只是软件层面的“长连接”机制,由于 Kafka 创建的这些 Socket 连接都开启了 keepalive,因此 keepalive 探活机制还是会遵守的。 + - 在第二种方式中,TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。