From 5b8e063ef1a0a60a8b76182901e3f6b006c87e0b Mon Sep 17 00:00:00 2001 From: qyx <565485304@qq.com> Date: Sun, 18 Sep 2022 22:59:23 +0800 Subject: [PATCH] =?UTF-8?q?[=E6=96=87=E6=A1=A3=E4=BF=AE=E6=94=B9](master):?= =?UTF-8?q?=20=E5=AE=A2=E6=88=B7=E7=AB=AF=E9=83=BD=E6=9C=89=E5=93=AA?= =?UTF-8?q?=E4=BA=9B=E4=B8=8D=E5=B8=B8=E8=A7=81=E4=BD=86=E6=98=AF=E5=BE=88?= =?UTF-8?q?=E9=AB=98=E7=BA=A7=E7=9A=84=E5=8A=9F=E8=83=BD=EF=BC=9F=20?= =?UTF-8?q?=E5=91=A8=E6=97=A5=E5=AD=A6=E4=B9=A0=202022=E5=B9=B409=E6=9C=88?= =?UTF-8?q?18=E6=97=A522:59:18?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bigdata/kafka/README.md | 110 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 108 insertions(+), 2 deletions(-) diff --git a/bigdata/kafka/README.md b/bigdata/kafka/README.md index 7f0bcb5..5d30f99 100644 --- a/bigdata/kafka/README.md +++ b/bigdata/kafka/README.md @@ -467,10 +467,116 @@ kafka-consumer-groups.sh --bootstrap-server 172.16.26.183:9092 --describe --grou - 碰到的实际场景,影响还是很大的。acks=all时,大部分的请求处理延时都花在了follower同步上。 是的,acks=all表明所有ISR中的副本都要同步。 ### 2.4 客户端都有哪些不常见但是很高级的功能? +- 什么是拦截器? + - 基本思想就是允许应用程序在**不修改逻辑的情况下**,**动态地实现**一组可**插拔的事件处理逻辑链**。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。 + +- Kafka 拦截器 + - Kafka **拦截器分为生产者拦截器和消费者拦截器**。 + - 生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。 + 值得一提的是,这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,**Kafka 会按照添加顺序依次执行拦截器逻辑**。 + - Kafka 拦截器的设置方法是通过参数配置完成的。生产者和消费者两端有一个相同的参数,名字叫 **interceptor.classes**,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类。 + - 假设第一个拦截器的完整类路径是 com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二个类是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor, + 那么你需要按照以下方法在 Producer 端指定拦截器: +```java +Properties props = new Properties(); +List interceptors = new ArrayList<>(); +interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器 1 +interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器 2 +props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); +``` +- 我们应该怎么编写 AddTimeStampInterceptor 和 UpdateCounterInterceptor 类呢? + - 这两个类以及你自己编写的所有 Producer 端拦截器实现类都**要继承 org.apache.kafka.clients.producer.ProducerInterceptor 接口**。该接口是 Kafka 提供的,里面有两个核心的方法。 + - onSend:该方法会在消息发送之前被调用。如果你想在**发送之前**对消息“美美容”,这个方法是你唯一的机会。 + - onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。还记得我在上一期中提到的发送回调通知 callback 吗?**onAcknowledgement 的调用要早于 callback 的调用**。 + 值得注意的是,**这个方法和 onSend 不是在同一个线程中被调用的**,因此如果你在这两个方法中调用了某个共享可变对象,**一定要保证线程安全哦**。 + 还有一点很重要,这个方法处在 Producer 发送的主路径中,所以**最好别放一些太重的逻辑进去**,否则你会发现你的 Producer TPS 直线下降。 + - 同理,指定消费者拦截器也是同样的方法,只是具体的实现类要**实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口**,这里面也有两个核心方法。 + - onConsume:该方法在消息返回给 Consumer 程序**之前**调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。 + - onCommit:Consumer 在**提交位移之后调用**该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。 + - 一定要注意的是,指定拦截器类时要指定它们的**全限定名**,即 full qualified name。通俗点说就是要把完整包名也加上,不要只有一个类名在那里,并且还要保证你的 Producer 程序能够正确加载你的拦截器类。 + +- 典型使用场景 + - 其实,跟很多拦截器的用法相同,**Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计**等多种功能在内的场景。 + - Kafka 默认提供的监控指标都是针对单个客户端或 Broker 的,你很难从具体的消息维度去**追踪集群间消息的流转路径**。同时,如何监控一条消息从生产到最后消费的端到端延时也是很多 Kafka 用户迫切需要解决的问题。 + - 通过实现拦截器的逻辑以及可插拔的机制,我们能够快速地观测、验证以及监控集群间的客户端性能指标,特别是能够**从具体的消息层面上去收集这些数据**。这就是 Kafka 拦截器的一个非常典型的使用场景。 + - 我们再来看看消息审计(message audit)的场景。设想你的公司把 Kafka 作为一个私有云消息引擎平台向全公司提供服务,这必然要**涉及多租户以及消息审计的功能**。 + - 作为私有云的 PaaS 提供方,你肯定要能够随时查看每条消息是哪个业务方在什么时间发布的,之后又被哪些业务方在什么时刻消费。 + 一个可行的做法就是你编写一个拦截器类,实现相应的消息审计逻辑,然后强行规定所有接入你的 Kafka 服务的客户端程序必须设置该拦截器。 + +- 案例分享 + - 通过编写拦截器类来统计消息端到端处理的延时,非常实用,我建议你可以直接移植到你自己的生产环境中。 + - 某个业务只有一个 Producer 和一个 Consumer,他们想知道该业务消息从被生产出来到最后被消费的平均总时长是多少,但是目前 Kafka 并没有提供这种端到端的延时统计。 + - 既然是要计算总延时,**那么一定要有个公共的地方来保存它**,并且这个公共的地方还是要让生产者和消费者程序都能访问的。在这个例子中,我们假设数据被保存在 Redis 中。 + - 实现生产者拦截器 +```java +public class AvgLatencyProducerInterceptor implements ProducerInterceptor { + + private Jedis jedis; // 省略 Jedis 初始化 + + @Override + public ProducerRecord onSend(ProducerRecord record) { + jedis.incr("totalSentMessage"); + return record; + } + + + @Override + public void onAcknowledgement(RecordMetadata metadata, Exception exception) { + } + + + @Override + public void close() { + } + + + @Override + public void configure(Map configs) { + } +``` +- 上面的代码比较关键的是在发送消息前更新总的已发送消息数。为了节省时间,我没有考虑发送失败的情况,因为发送失败可能导致总发送数不准确。不过好在处理思路是相同的,你可以有针对性地调整下代码逻辑。 +- 消费者端的拦截器实现,代码如下: +```java +public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor { + + + private Jedis jedis; // 省略 Jedis 初始化 + + + @Override + public ConsumerRecords onConsume(ConsumerRecords records) { + long lantency = 0L; + for (ConsumerRecord record : records) { + lantency += (System.currentTimeMillis() - record.timestamp()); + } + jedis.incrBy("totalLatency", lantency); + long totalLatency = Long.parseLong(jedis.get("totalLatency")); + long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage")); + jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs)); + return records; + } + + + @Override + public void onCommit(Map offsets) { + } + + + @Override + public void close() { + } + + + @Override + public void configure(Map configs) { +``` +- 在上面的消费者拦截器中,我们在真正消费一批消息前首先更新了它们的总延时,方法就是用**当前的时钟时间减去封装在消息中的创建时间,然后累计得到这批消息总的端到端处理延时并更新到 Redis 中**。 + 之后的逻辑就很简单了,我们分别从 Redis 中读取更新过的总延时和总消息数,两者相除即得到端到端消息的平均处理延时。 +- 创建好生产者和消费者拦截器后,我们按照上面指定的方法分别将它们配置到各自的 Producer 和 Consumer 程序中,这样就能计算消息从 Producer 端到 Consumer 端平均的处理延时了。 + 这种端到端的指标监控能够从全局角度俯察和审视业务运行情况,及时查看业务是否满足端到端的 SLA 目标。 - - +### 2.5