You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
土豆兄弟 ea784ed0d0 [文档修改](master): 无消息丢失配置怎么实现
周日学习 2022年09月18日22:44:17
2 years ago
..
file [新增功能](master): 更新了文档 2 years ago
pic [文档修改](master): 无消息丢失配置怎么实现 2 years ago
springboot-kafka [新增功能](master): springboot-rocketmq整合 2 years ago
.gitignore [新增功能](master): 更新了文档 2 years ago
README.md [文档修改](master): 无消息丢失配置怎么实现 2 years ago

README.md

Kafka

0. 目录

  • Kafka Kafka基本概念及配置

    Kafka 客户端实践及原理剖析

    Kafka 内核及源码

    Kafka 管理和监控

    Kafka 流处理

1. Kafka基本概念及配置

1.1 概念

  • 概述
    • Kafka 是分布式的流处理平台
    • Kafka 是基于Zookeeper的分布式消息系统
    • Kafka具有高吞吐率、高性能、实时、高可靠等特点
  • Kafka 的组成概念
    • Topic: 一个虚拟的概念, 由1到多个Partitions组成
    • Partition: 实际消息存储单位
    • Producer: 消息生产者
    • Consumer: 消息消费者

1.2 安装Zookeeper(快捷)

# 修改配置文件
cp zoo_sample.cfg zoo.cfg

# 需要修改参数
dataDir=指定一个存储配置文件的目录

# 启动
./zkServer.sh start

# 连接测试
./zkCli.sh

1.3 安装Kafka


# 修改基本的配置文件
vim server.properties

# 需要修改的配置项
listeners=PLAINTEXT://[你的ip地址]:9092

advertised.listeners=PLAINTEXT://[你的ip地址]:9092

log.dirs=你的kafka的日志目录

zookeeper.connect=[你的ip地址]:2181

1.4 Kafka 的线上方案

  • 操作系统
    • 在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的 快速数据传输特性。
    • Windows 平台上部署 Kafka 只适合于个人测试或用于功能验证, 千万不要应用于生产环境。
  • 磁盘
    • 普通的机械磁盘 即可, Kafka 大量使用磁盘不假,可它使用的方式多是顺序读写操作,一定程度上规避了机械磁盘 最大的劣势,即随机读写操作慢。
    • 追求性价比的公司可以不搭建 RAID,使用普通磁盘组成存储空间即可。 [Kafka 1.1开始正式支持JBOD了。再说Kafka本身在软件层面也提供了冗余的机制来对 抗磁盘损坏。]
  • 磁盘容量
    • 在规划磁盘容量时你需要考虑下面这几个元素:
      • 新增消息数
      • 消息留存时间
      • 平均消息大小
      • 备份数
      • 是否启用压缩
      • 例子:
        • 假设你所在公司有个业务每天需要向 Kafka 集群发送 1 亿条消息,每条消息保存两份以防止数据丢失,另外消息默认保存两周 时间。现在假设消息的平均大小是 1KB?
        • 每天 1 亿条 1KB 大小的消息,保存两份且留存两周的时间,那么总的空 间大小就等于 1 亿 * 1KB * 2 / 1000 / 1000 = 200GB。一般情况下 Kafka 集群除了消息 数据还有其他类型的数据,
          • 比如索引数据等,故我们再为这些数据预留出 10% 的磁盘空 间,因此总的存储容量就是 220GB。既然要保存两周那么整体容量即为 220GB * 14 大约 3TB 左右。Kafka 支持数据的压缩, 假设压缩比是 0.75,那么最后你需要规划的存储 空间就是 0.75 * 3 = 2.25TB。
  • 带宽
    • 对于 Kafka 这种通过网络大量进行数据传输的框架而言,带宽特别容易成为瓶颈。事实 上,在我接触的真实案例当中,带宽资源不足导致 Kafka 出现性能问题的比例至少占 60% 以上。 如果你的环境中还涉及跨机房传输,那么情况可能就更糟了。
    • 带宽也主要有 两种:1Gbps 的千兆网络和 10Gbps 的万兆网络,特别是千兆网络应该是一般公司网络的标准配置了。
    • 与其说是带宽资源的规划,其实真正要规划的是所需的 Kafka 服务器的数量。
    • 假设你公司 的机房环境是千兆网络,即 1Gbps现在你有个业务其业务目标或 SLA 是在 1 小时内处 理 1TB 的业务数据。那么问题来了,你到底需要多少台 Kafka 服务器来完成这个业务呢?
      • 让我们来计算一下,由于带宽是 1Gbps即每秒处理 1Gb 的数据,假设每台 Kafka 服务 器都是安装在专属的机器上,也就是说每台 Kafka 机器上没有混布其他服务,毕竟真实环境中不建议这么做。 通常情况下你只能假设 Kafka 会用到 70% 的带宽资源,因为总要为其 他应用或进程留一些资源。根据实际使用经验,超过 70% 的阈值就有网络丢包的可能性了,故 70% 的设定是一个比 较合理的值, 也就是说单台 Kafka 服务器最多也就能使用大约 700Mb 的带宽资源。稍等,这只是它能使用的最大带宽资源,你不能让 Kafka 服务器常规性使用这么多资源, 故通常要再额外预留出 2/3 的资源, 即单台服务器使用带宽 700Mb / 3 ≈ 240Mbps。需 要提示的是,这里的 2/3 其实是相当保守的,你可以结合你自己机器的使用情况酌情减少 此值。 好了,有了 240Mbps 我们就可以计算 1 小时内处理 1TB 数据所需的服务器数量了。根 据这个目标,我们每秒需要处理 2336Mb [10241024/36008] 的数据,除以 240约等于 10 台服务器。 如果 消息还需要额外复制两份,那么总的服务器台数还要乘以 3即 30 台
  • Kafka 性能测试脚本
    • kafka-producer-perf-test 脚本还不错kafka-consumer-perf-test有点难用
  • 混部
    • 一个机器上 部署一个zookeeper和一个kafka, 如果配置好ZooKeeper事务日志(比如设置好autopurge.purgeInterval及定期删除 snapshot等)它对IO的需求不大混布也是可以的。

  • 最最最重要的集群参数配置

    • Kafka 服务器端的配置,其中既有 Broker 端参数,也有主题(Topic)级别的参数、JVM 端参数和操作系统级别的参数
  • Broker 端参数

    • 首先 Broker 是需要配置存储信息的,即 Broker 使用哪些 磁盘。那么针对存储信息的重要参数有以下这么几个:
      • log.dirs: 这是非常重要的参数,指定了 Broker 需要 使用的若干个文件目录路径。要知道这个参数是没有默认 值的,这说明什么?这说明它必须由你亲自指定
      • log.dir: 注意这是 dir结尾没有 s说明它只能表示 单个路径,它是补充上一个参数用的
      • 这两个参数应该怎么设置呢?很简单,你只要设置 log.dirs即第一个参数就好了不要设置log.dir。而且更重要的是在线上生产环境中一定要为log.dirs配置 多个路径, 具体格式是一个 CSV 格式,也就是用逗号分隔 的多个路径,比 如/home/kafka1,/home/kafka2,/home/kafka3这 样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。
        • 这样做有两个好处:
        • 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
        • 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新 引入的强大功能。要知道在以前,只要 Kafka Broker 使 用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。 但是自 1.1 开始,这种情况被修正了, 坏掉的磁盘上的数 据会自动地转移到其他正常的磁盘上,而且 Broker 还能 正常工作。这个改进正是我们舍弃 RAID 方案的基 础:没有这种 Failover 的话,我们只能依靠 RAID 来提 供保障。
    • 与 ZooKeeper 相关的设置
      • 首先 ZooKeeper 是 做什么的呢?它是一个分布式协调框架,负责协调管理并保 存 Kafka 集群的所有元数据信息比如集群都有哪些Broker 在运行、创建了哪些 Topic每个 Topic 都有多少 分区以及这些分区的 Leader 副本都在哪些机器上等信息。
      • Kafka 与 ZooKeeper 相关的最重要的参数当属 zookeeper.connect
        • 这也是一个 CSV 格式的参数,比 如我可以指定它的值为 zk1:2181,zk2:2181,zk3:2181。2181 是 ZooKeeper 的默认端口。
      • 如果我让多个 Kafka 集群使用同一套 ZooKeeper 集群,那么这个参数应该怎么设置呢?这时候 chroot 就派上用场了。这个 chroot 是 ZooKeeper 的概 念,类似于别名。
        • 如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2那么两套集群的zookeeper.connect参数可以 这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1和 zk1:2181,zk2:2181,zk3:2181/kafka2
        • 切记 chroot 只需要写一次,而且是加到最后的。我经常碰到有人这样指 定: zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3这样的格式是不对的。
    • 第三组参数是与 Broker 连接相关的,即客户端程序或其他 Broker 如何与该 Broker 进行通信的设置。有以下三个参 数:
      • listeners: 学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。
        • advertised.listeners: 和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的, 就是说这组监听器是 Broker 用于对外发布的
          • [内网环境访问Kafka不需要配置这个参数常见的玩法是:你的Kafka Broker机器上配置了双网卡一 块网卡用于内网访问(即我们常说的内网IP);另一个块用 于外网访问。那么你可以配置listeners为内网IP advertised.listeners为外网IP。]
      • host.name/port: 列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。
      • 监听器的概念,从构成上来说,它是若干个逗 号分隔的三元组,每个三元组的格式为<协议名称,主机 名,端口号>。这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密 传输等; 也可能是你自己定义的协议名字,比如 CONTROLLER: //localhost:9092。
        • 一旦你自己定义了协议名称,你必须还要指定 listener.security.protocol.map参数告诉这个协议底层使用了哪种安全协议比如指定 listener.security.protocol.map=CONTROLLER:P LAINTEXT表示CONTROLLER这个自定义协议底层使用明文 不加密传输数据。
      • 经常有人会问主机 名这个设置中我到底使用 IP 地址还是主机名。这里我给出 统一的建议:最好全部使用主机名,即 Broker 端和 Client 端应用配置中全部填写主机名。 Broker 源代码中也使用的 是主机名,如果你在某些地方使用了 IP 地址进行连接,可能会发生无法连接的问题。
    • 第四组参数是关于 Topic 管理的。我来讲讲下面这三个参数:
      • auto.create.topics.enable:是否允许自动创建 Topic。
        • [建议最好设置成 false,即不允许自动创建 Topic。每个部门被分配的 Topic 应该由运维严格把控,决不能允许自行创建任何 Topic。]
      • unclean.leader.election.enable:是否允许 Unclean Leader 选举。
        • [每个分区都有多个副本来提供高可用。 在这些副本中只能有一个副本对外提供服务,即所谓的 Leader 副本。建议你还是显式地把它设置成 false 吧]
      • auto.leader.rebalance.enable:是否允许定期进 行 Leader 选举。
        • [对生产环境影响非常大,设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满 足一定的条件才会发生。严格来说它与上一个参数中Leader 选举的最大不同在于, 它不是选 Leader而是换 Leader!比如 Leader A 一直表现得很好,但若 true则有可能 一段时间后 Leader A 就要被强行卸任换成 Leader B。建议你在生产环境中把这 个参数设置成 false。]
    • 最后一组参数是数据留存方面的,即:
      • log.retention.{hour|minutes|ms}:这是个“三 兄弟”,都是控制一条消息数据被保存多长时间。从优先 级上来说 ms 设置最高、minutes 次之、hour 最低。
        • [虽然 ms 设置有最高的优先级,但是 通常情况下我们还是设置 hour 级别的多一些,比如 log.retention.hour=168表示默认保存 7 天的数据,自动删除 7 天前的数据。很多公司把 Kafka 当做存储来使 用,那么这个值就要相应地调大。]
      • log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。
        • [这个值默认是 -1 表明你想在这台 Broker 上保存多少数据都可以,至少在容 量方面 Broker 绝对为你开绿灯,不会做任何阻拦。这个参 数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集 群:设想你要做一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间]
      • message.max.bytes:控制 Broker 能够接收的最大消息大小。
        • [不能使用默认值的参数,默认的 1000012 太少了,还不到 1MB。实际场景中突 破 1MB 的消息都是屡见不鲜的,因此在线上环境中设置一 个比较大的值还是比较保险的做法。毕竟它只是一个标尺而 已,仅仅衡量 Broker 能够处理的最大消息大小,即使设置 大一点也不会耗费什么磁盘空间的。]
    • 补充参数
      • gg.handler.kafkahandler.Mode = tx gg.handler.kafkahandler.Mode = op 这两个的差别。遇到时 dml 数据会丢失的情况。用的是 op 。
        • 当设置成op单个数据库表的变更(插入、更新、删除) 会被当成一条Kafka消息发送;当设置成tx时数据库事务 所做的所有变更统一被封装进一条Kafka消息并在事务提 交后被发送。后者有事务性的保障,至少有原子性方面的保证,不会丢失部分 CDC 数据。
  • Topic 级别参数

    • Topic 级别参数会覆盖全局 Broker 参数的值,而每个 Topic 都能设置自己的参数值, 这就是所谓的 Topic 级别参数。
    • retention.ms: 规定了该 Topic 消息被保存的时长。 默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦 设置了这个值,它会覆盖掉 Broker 端的全局参数值。
    • retention.bytes:规定了要为该 Topic 预留多大的磁 盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1表示可以 无限使用磁盘空间。
    • Topic 级别 参数的设置就是这种情况,我们有两种方式可以设置:
      • 创建 Topic 时进行设置
      • 修改 Topic 时设置
    • 如何在创建 Topic 时设置这些参数
      • 你的部门需要将交易数据发送到 Kafka 进行处理,需要保 存最近半年的交易数据,同时这些数据很大,通常都有几 MB但一般不会超过 5MB。现在让我们用以下命令来创建 Topic:
        • bin/kafka-topics.sh--bootstrap-serverlocalhost:9092--create--topictransaction--partitions1--replication-factor1--configretention.ms=15552000000--configmax.message.bytes=5242880
      • 请注意结尾处的--config设置我们就是在 config 后面指定了想要设置的 Topic 级别参数。
        • 使用另一个自带的命令kafka-configs来修改 Topic 级别参数。假设我们现在要发送最大值是 10MB 的消息,该如何修改呢?命令如下:
        • bin/kafka-configs.sh--zookeeperlocalhost:2181--entity-typetopics--entity-nametransaction--alter--add-configmax.message.bytes=10485760
        • [你最好始终坚持使用第二种方式来设置并且在未来Kafka 社区很有可能统一使用kafka-configs脚本来调整 Topic 级别参数。]
  • JVM 参数

    • Kafka 服务器端代码是用 Scala 语言编写的,但终归还是编译成 Class 文件在 JVM 上运行,因此 JVM 参数设置对于 Kafka 集群的重要性不言而喻。
    • 有条件的话至少使用 Java 8 吧, [Kafka 2.0已经不支持Java 7了2.1版本开始初步支持Java 11但不建议生产环境用11所以还是使用Java 8吧。]
    • 堆大小这个参数至关重要。无脑给出一个通用的建议:将你的 JVM 堆大小设置成 6GB 吧,这是目前业界比较公认的一个合理值。我见过很多人就是使用默认的 Heap Size 来跑 Kafka说实话默认的 1GB 有点小, 毕竟 Kafka Broker 在与客户端进行交互时会在 JVM 堆上创建大量的 ByteBuffer 实例Heap Size 不能太小。
      • [没有通用的标准只有一个最佳实践值6GB。最好还是监控一下实时的堆大小特别是GC之后的live data大小通常将heapsize设置成其1.5~2倍就足以了]
    • JVM 端配置的另一个重要参数就是垃圾回收器的设置,也就是平时常说的 GC 设置。如果你依然在使用 Java 7那么可以根据以下法则选择合适的垃圾回收器
      • 如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。
      • 否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。
    • 当然了,如果你已经在使用 Java 8 了[G1是jdk9中默认的jdk8还是需要显式指定的]。在没有任何调优的情况下G1 表现得要比 CMS 出色,主要体现在更少的 Full GC需要调整的参数更少等所以使用 G1 就好了。
    • Java8默认的新生代垃圾回收器是UseParallelGC可以用-XX:+PrintCommandLineFlags -version查看如果显示指定 -XX:+UseCurrentMarkSweepGC 的话,会默认开启 -XX:+UseParallelGC
    • 确定好了要设置的 JVM 参数,我们该如何为 Kafka 进行设置呢?
      • 只需要设置下面这两个环境变量即可:
        • KAFKA_HEAP_OPTS指定堆大小。
        • KAFKA_JVM_PERFORMANCE_OPTS指定 GC 参数。
      • 比如你可以这样启动 Kafka Broker即在启动 Kafka Broker 之前,先设置上这两个环境变量:
        • $> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
        • $> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
        • $> bin/kafka-server-start.sh config/server.properties
  • 操作系统参数

    • Kafka 并不需要设置太多的 OS 参数,但有些因素最好还是关注一下,比如下面这几个:
      • 文件描述符限制
      • 文件系统类型
      • Swappiness
      • 提交时间
    • 首先是 ulimit -n
      • 觉得任何一个 Java 项目最好都调整下这个值。实际上,文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如 ulimit -n 1000000
      • 但不设置的话后果很严重比如你会经常看到“Too many open files”的错误。
    • 其次是文件系统类型的选择。
      • 这里所说的文件系统指的是如 ext3、ext4 或 XFS 这样的日志型文件系统。根据官网的测试报告,XFS 的性能要强于 ext4所以生产环境最好还是使用 XFS。对了最近有个 Kafka 使用 ZFS 的数据报告,貌似性能更加强劲,有条件的话不妨一试。
    • 第三是 swap 的调优
      • 网上很多文章都提到设置其为 0将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。我个人反倒觉得还是不要设置成 0 比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成 0当物理内存耗尽时操作系统会触发 OOM killer 这个组件, 它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑, 我个人建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1

1.5 kafka的基本操作


# 1、启动Kafka - todo 这个命令会让控制台有日志生成不太方便 
bin/kafka-server-start.sh config/server.properties &

# 2、停止Kafka
bin/kafka-server-stop.sh

# 3、创建Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic jiangzh-topic

# 4、查看已经创建的Topic信息
bin/kafka-topics.sh --list --zookeeper localhost:2181

# 5、发送消息
bin/kafka-console-producer.sh --broker-list 192.168.220.128:9092 --topic jiangzh-topic

# 6、接收消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.220.128:9092 --topic jiangzh-topic --from-beginning

# {"orderId":"002","price":"80"}

#  【旧版】
kafka-topics.sh --zookeeper 172.16.26.183:2181 --list
#  【新版】
kafka-topics.sh --bootstrap-server 172.16.26.183:9092  --list
#   zookeeper is not a recognized option主要原因是 Kafka 版本过高,命令不存在)
#  创建topic主题
kafka-topics.sh --bootstrap-server 172.16.26.183:9092  --create --topic topic1 --partitions 1 --replication-factor 3
#  --create 命令后 --topic 为创建topic 并指定 topic name
#  --partitions 为指定分区数量
#  --replication-factor 为指定副本集数量
#  向kafka集群发送数据
#  【无key型消息】
kafka-console-producer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1
#  【有key型消息】 
kafka-console-producer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 --property parse.key=true
#  默认消息键与消息值间使用“Tab键”进行分隔切勿使用转义字符(\t)
#  kafka命令接受数据
kafka-console-consumer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 --from-beginning
#  kafka查看消费进度当我们需要查看一个消费者组的消费进度时则使用下面的命令(启动Consumer的时候才会真的生效)【这条命令对查询消费特别重要】
kafka-consumer-groups.sh --bootstrap-server 172.16.26.183:9092 --describe --group group1

2. 客户端实践及原理剖析

2.1 生产者消息分区机制原理剖析

  • 为什么分区?

    • Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息
    • 主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。
    • 为什么使用分区的概念而不是直接使用多个主题呢?
      • 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性Scalability
      • 不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理
      • 并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量
      • 实际上分区的概念以及分区数据库早在 1980 年就已经有大牛们在做了,比如那时候有个叫 Teradata 的数据库就引入了分区的概念
    • Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard而在 HBase 中则叫 Region在 Cassandra 中又被称作 vnode
    • 除了提供负载均衡这种最核心的功能之外,利用分区也可以实现其他一些业务级别的需求,比如实现业务级别的消息顺序的问题
  • 都有哪些分区策略?

    • 所谓分区策略是决定生产者将消息发送到哪个分区的算法。
    • 如果要自定义分区策略你需要显式地配置生产者端的参数partitioner.class
      • 在编写生产者程序时你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。
      • 这个接口也很简单只定义了两个方法partition()和close(),通常你只需要实现最重要的 partition 方法。
        • 方法签名int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
        • 这里的topic、key、keyBytes、value和valueBytes都属于消息数据cluster则是集群信息比如当前 Kafka 集群共有多少主题、多少 Broker 等)
        • Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。
        • 只要你自己的实现类定义好了 partition 方法同时设置partitioner.class参数为你自己实现类的 Full Qualified Name,那么生产者程序就会按照你的代码逻辑对消息进行分区。
    • 比较常见的分区策略
      • 轮询策略
        • 也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0第二条被发送到分区 1第三条被发送到分区 2以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0就像下面这张图展示的那样。
        • 轮询策略
        • 这就是所谓的轮询策略。轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。如果你未指定partitioner.class参数那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。
        • [轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。]
      • 随机策略
        • 也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示。
        • 随机策略
        • 如果要实现随机策略版的 partition 方法,很简单,只需要两行代码即可:
          • List partitions = cluster.partitionsForTopic(topic);
          • return ThreadLocalRandom.current().nextInt(partitions.size());
          • 先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。
          • 本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。
      • 按消息键保序策略
        • 也称 Key-ordering 策略。这个名词是我自己编的Kafka 官网上并无这样的提法。
        • Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。 特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面, 由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示。
        • 按消息键保序策略
        • 实现这个策略的 partition 方法同样简单,只需要下面两行代码即可:
          • List partitions = cluster.partitionsForTopic(topic);
          • return Math.abs(key.hashCode()) % partitions.size();
      • 前面提到的 Kafka 默认分区策略实际上同时实现了两种策略:如果指定了 Key那么默认实现按消息键保序策略如果没有指定 Key则使用轮询策略。
      • 如何实现消息的顺序问题?
        • 企业发送的 Kafka 的消息是有因果关系的,故处理因果关系也必须要保证有序性,否则先处理了“果”后处理“因”必然造成业务上的混乱。
        • 当时企业的做法是给 Kafka 主题设置单分区,也就是 1 个分区。这样所有的消息都只在这一个分区内读写,因此保证了全局的顺序性。 这样做虽然实现了因果关系的顺序性,但也丧失了 Kafka 多分区带来的高吞吐量和负载均衡的优势。
        • 后来经过了解和调研,我发现这种具有因果关系的消息都有一定的特点,比如在消息体中都封装了固定的标志位,改进建议他们对此标志位设定专门的分区策略,保证同一标志位的所有消息都发送到同一分区 这样既可以保证分区内的消息顺序,也可以享受到多分区带来的性能红利。
        • 这种基于个别字段的分区策略本质上就是按消息键保序的思想,其实更加合适的做法是把标志位数据提取出来统一放到 Key 中,这样更加符合 Kafka 的设计思想
    • 其他分区策略
      • 其实还有一种比较常见的,即所谓的基于地理位置的分区策略。当然这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群。
      • 假设的所有服务都部署在北京的一个机房(这里我假设它是自建机房,不考虑公有云方案。其实即使是公有云,实现逻辑也差不多),现在考虑在南方找个城市(比如广州)再创建一个机房; 另外从两个机房中选取一部分机器共同组成一个大的 Kafka 集群。显然,这个集群中必然有一部分机器在北京,另外一部分机器在广州。
      • 计划为每个新注册用户提供一份注册礼品,比如南方的用户可以免费得到一碗“甜豆腐脑”,而北方的新注册用户可以得到一碗“咸豆腐脑”。 如果用 Kafka 来实现则很简单,只需要创建一个双分区的主题,然后再创建两个消费者程序分别处理南北方注册用户逻辑即可。
      • 但问题是你需要把南北方注册用户的注册消息正确地发送到位于南北方的不同机房中,因为处理这些消息的消费者程序只可能在某一个机房中启动着。 换句话说,送甜豆腐脑的消费者程序只在广州机房启动着,而送咸豆腐脑的程序只在北京的机房中,如果你向广州机房中的 Broker 发送北方注册用户的消息,那么这个用户将无法得到礼品!
      • 此时我们就可以根据 Broker 所在的 IP 地址实现定制化的分区策略。比如下面这段代码:
        • List partitions = cluster.partitionsForTopic(topic);
        • return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
      • 我们可以从所有分区中找出那些 Leader 副本在南方的所有分区,然后随机挑选一个进行消息发送。
  • 补充问题

    • 在消息重试的时候分区策略会重新再计算一次吗比如一开始选择到5号分区但是5号分区有问题导致重试重试的时候可以重试发送到别的分区上吗
      • 不会的。消息重试只是简单地将消息重新发送到之前的分区
    • 在看kafka-client生产者默认分区源码时看到了cluster.partitionsForTopic和cluster.availablePartitionsForTopic请问什么时候分区是available什么时候是不unavailable的
      • 分区没有leader的时候就是unavailable了。某些操作会导致瞬间没有leader比如分区reassign、换leader等
    • RocketMQ与Kafka的主要区别
      • Kafka吞吐量大多是面向大数据场景。RocketMQ吞吐量也很强 不过它号称是金融业务级的消息中间件,也就是说可以用于实际的业务系统;
      • RocketMQ毕竟是阿里出品在国内技术支持力度要比Kafka强
      • Kafka现在主要发力StreamingRocketMQ在流处理这块表现如何我不太清楚至少streaming不是它现阶段的主要卖点。
      • 其他方面这两者确实都差不多
    • 广州机房怎么消费广州partition的数据consumer如何指定消费的partition。
      • 使用这个方法consumer.assign()直接消息指定分区

2.2 生产者压缩算法面面观

  • 说起压缩compression我相信你一定不会感到陌生。它秉承了用时间去换空间的经典 trade-off 思想,具体来说就是用 CPU 时间去换磁盘空间或网络 I/O 传输量,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。在 Kafka 中,压缩也是用来做这件事的。

  • 怎么压缩?

    • Kafka 是如何压缩消息的呢?要弄清楚这个问题,就要从 Kafka 的消息格式说起了。目前 Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本。V2 版本是 Kafka 0.11.0.0 中正式引入的。
    • 不论是哪个版本Kafka 的消息层次都分为两层:消息集合message set以及消息message一个消息集合中包含若干条日志项record item而日志项才是真正封装消息的地方。 Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作
    • V2 版本主要是针对 V1 版本的一些弊端做了修正
      • 一个,就是把消息的公共部分抽取出来放到外层消息集合里面,这样就不用每条消息都保存这些信息了。
        • 在 V2 版本中,消息的 CRC 校验工作就被移到了消息集合这一层。
      • V2 版本还有一个和压缩息息相关的改进,就是保存压缩消息的方法发生了变化。
        • 之前 V1 版本中保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息体字段中;而 V2 版本的做法是对整个消息集合进行压缩。显然后者应该比前者有更好的压缩效果。
      • V2 版本都比 V1 版本节省磁盘空间。
  • 何时压缩?

    • 在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。
  • 生产者程序中配置 compression.type 参数即表示启用指定类型的压缩算法。

  • 下面这段程序代码展示了如何构建一个开启 GZIP 的 Producer 对象:

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 // 开启 GZIP 压缩
 props.put("compression.type", "gzip");
 
 Producer<String, String> producer = new KafkaProducer<>(props);
  • 这里比较关键的代码行是 props.put(“compression.type”, “gzip”),它表明该 Producer 的压缩算法使用的是 GZIP。这样 Producer 启动后生产的每个消息集合都是经 GZIP 压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。

  • 在生产者端启用压缩是很自然的想法,那为什么我说在 Broker 端也可能进行压缩呢?其实大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但这里的“大部分情况”也是要满足一定条件的。

  • 有两种例外情况就可能让 Broker 重新压缩消息。

    • 情况一Broker 端指定了和 Producer 端不同的压缩算法。
      • Producer 说:“我要使用 GZIP 进行压缩。”
      • Broker 说:“不好意思,我这边接收的消息必须使用 Snappy 算法进行压缩。”
      • Kafka 官网,你会发现 Broker 端也有一个参数叫 compression.type和上面那个例子中的同名。但是这个参数的默认值是 producer,这表示 Broker 端会“尊重”Producer 端使用的压缩算法。 可一旦你在 Broker 端设置了不同的 compression.type 值,就一定要小心了,因为可能会发生预料之外的压缩 / 解压缩操作,通常表现为 Broker 端 CPU 使用率飙升
    • 情况二Broker 端发生了消息格式转换。
      • 在一个生产环境中Kafka 集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外, 它还让 Kafka 丧失了引以为豪的 Zero Copy 特性。
    • 所以尽量保证消息格式的统一吧,这样不仅可以避免不必要的解压缩 / 重新压缩,对提升其他方面的性能也大有裨益。如果有兴趣你可以深入地了解下 Zero Copy 的原理。
  • 何时解压缩?

    • 通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到 Broker 后Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时Broker 依然原样发送出去,当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息
    • Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka 会将启用了哪种压缩算法封装进消息集合中,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。
    • 住这句话:Producer 端压缩、Broker 端保持、Consumer 端解压缩。
    • 除了在 Consumer 端解压缩Broker 端也会进行解压缩。注意了,这和前面提到消息格式转换时发生的解压缩是不同的场景。每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证。 我们必须承认这种解压缩对 Broker 端性能是有一定影响的,特别是对 CPU 的使用率而言
    • 国内京东的小伙伴们刚刚向社区提出了一个 bugfix建议去掉因为做消息校验而引入的解压缩。据他们称去掉了解压缩之后Broker 端的 CPU 使用率至少降低了 50%
      • 目前社区并未采纳这个建议,原因就是这种消息校验是非常重要的,不可盲目去之。毕竟先把事情做对是最重要的,在做对的基础上,再考虑把事情做好做快
      • 你也可以思考一下,是否有一个两全其美的方案,既能避免消息解压缩也能对消息执行校验。
  • 各种压缩算法对比

    • 在 Kafka 2.1.0 版本之前Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4。从 2.1.0 开始Kafka 正式支持 Zstandard 算法(简写为 zstd。它是 Facebook 开源的一个压缩算法能够提供超高的压缩比compression ratio
    • 对了,看一个压缩算法的优劣,有两个重要的指标:
      • 一个指标是压缩比,原先占 100 份空间的东西经压缩之后变成了占 20 份空间,那么压缩比就是 5显然压缩比越高越好
      • 另一个指标就是压缩 / 解压缩吞吐量,比如每秒能压缩或解压缩多少 MB 的数据。同样地,吞吐量也是越高越好。
      • Facebook Zstandard 官网提供的一份压缩算法 benchmark 比较结果:
      • Zstandard压缩算法对比
        • zstd 算法有着最高的压缩比,而在吞吐量上的表现只能说中规中矩。
        • 反观 LZ4 算法,它在吞吐量方面则是毫无疑问的执牛耳者。
    • 在实际使用中GZIP、Snappy、LZ4 甚至是 zstd 的表现各有千秋。
    • 但对于 Kafka 而言,它们的性能测试结果却出奇得一致,
      • 在吞吐量方面LZ4 > Snappy > zstd 和 GZIP
      • 在压缩比方面zstd > LZ4 > GZIP > Snappy
      • 具体到物理资源,使用 Snappy 算法占用的网络带宽最多zstd 最少,这是合理的,毕竟 zstd 就是要提供超高的压缩比;
      • CPU 使用率方面,各个算法表现得差不多,只是在压缩时 Snappy 算法使用的 CPU 较多一些
      • 在解压缩时 GZIP 算法则可能使用更多的 CPU
    • 最佳实践
      • 首先来说压缩。何时启用压缩是比较合适的时机呢?
        • 你现在已经知道 Producer 端完成的压缩,那么启用压缩的一个条件就是 Producer 程序运行机器上的 CPU 资源要很充足。如果 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反。
        • 除了 CPU 资源充足这一条件,如果你的环境中带宽资源有限,那么我也建议你开启压缩。
        • 事实上我见过的很多 Kafka 生产环境都遭遇过带宽被打满的情况。这年头,带宽可是比 CPU 和内存还要珍贵的稀缺资源,毕竟万兆网络还不是普通公司的标配,因此千兆网络中 Kafka 集群带宽资源耗尽这件事情就特别容易出现。
        • 如果你的客户端机器 CPU 资源有很多富余,我强烈建议你开启 zstd 压缩,这样能极大地节省网络资源消耗
      • 其次说说解压缩。其实也没什么可说的。
        • 一旦启用压缩,解压缩是不可避免的事情。
        • 这里只想强调一点:我们对不可抗拒的解压缩无能为力,但至少能规避掉那些意料之外的解压缩。就像我前面说的,因为要兼容老版本而引入的解压缩操作就属于这类。有条件的话尽量保证不要出现消息格式转换的情况。
  • 补充问题

    • 正常情况下broker端会原样保存起来但是为了检验需要解压缩。该怎么去理解这个过程呢broker端解压缩以后还会压缩还原吗
      • 它只是解压缩读取而已,不会将解压缩之后的数据回写到磁盘。另外就像我置顶的留言那样,目前社区已经接纳了京东小伙伴的修改,貌似可以绕过这部分解压缩了.
      • 规避了broker端为执行校验而做的解压缩操作代码也merge进了2.4版本。有兴趣的同学可以看一下:
    • 消息层次、消息集合、消息、日志项这些概念与它们之间的关系
      • 消息批次RecordBatch里面包含若干条消息record)。 你可以认为消息批次和消息集合是等价的,消息和日志项是等价的。这样消息层次有两层:外层是消息批次(或消息集合);里层是消息(或日志项)。 Producer以recordbatch为单位发送消息对于V2版本一个batch中通常包含多条消息。在V2版本中在batch层面计算CRC值在V1版本中每条消息都要计算CRC值。

2.3 无消息丢失配置怎么实现?

  • 一句话概括Kafka 只对**“已提交”的消息committed message做有限度的持久化保证**。

    • 第一个核心要素是 “已提交的消息”。什么是已提交的消息?当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。
      • 那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个 Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。不论哪种情况Kafka 只对已提交的消息做持久化保证这件事情是不变的。
    • 第二个核心要素就是 “有限度的持久化保证”
      • 假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立Kafka 就能保证你的这条消息永远不会丢失。
  • “消息丢失”案例

    • 案例 1生产者程序丢失数据
    • 目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API那么它通常会立即返回但此时你不能认为消息发送已成功完成。
    • 这种发送方式有个有趣的名字叫“fire and forget”翻译一下就是“发射后不管”。它的意思是执行完一个操作后不去管它的结果是否成功。调用 producer.send(msg) 就属于典型的“fire and forget”
    • 如果用这个方式,可能会有哪些因素导致消息没有发送成功呢?
      • 其实原因有很多,例如网络抖动,导致消息压根就没有发送到 Broker 端;或者消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)
    • 就像前面说过的Kafka 不认为消息是已提交的,因此也就没有 Kafka 丢失消息这一说了。
    • 解决此问题的方法非常简单:
      • Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。
        • (回调),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。
      • 如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;如果是消息不合格造成的,那么可以调整消息格式后再次发送。
      • 总之,处理发送失败的责任在 Producer 端而非 Broker 端。
    • 案例 2消费者程序丢失数据
      • Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。Consumer 程序有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。
      • Consumer端的位移数据
        • 比如对于 Consumer A 而言,它当前的位移值就是 9Consumer B 的位移值是 11。
        • 这里的“位移”类似于我们看书时使用的书签,它会标记我们当前阅读了多少页,下次翻书的时候我们能直接跳到书签页继续阅读。
        • 正确使用书签有两个步骤:第一步是读书,第二步是更新书签页。不能颠倒
        • 要对抗这种消息丢失,办法很简单:维持先消费消息(阅读),再更新位移(书签)的顺序即可。这样就能最大限度地保证消息不丢失。
        • 当然,这种处理方式可能带来的问题是消息的重复处理,类似于同一页书被读了很多遍,但这不属于消息丢失的情形。
    • 还存在一种比较隐蔽的消息丢失场景。
      • 对于 Kafka 而言, Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。 假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。
      • 这里的关键在于 Consumer 自动提交位移
      • 这个问题的解决方案也很简单:
        • 如果是多线程异步处理消费消息Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移
      • 单个 Consumer 程序使用多线程来消费消息说起来容易,写成代码却异常困难,因为你很难正确地处理位移的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。
  • 最佳实践

    • 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法
    • 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
    • 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
    • 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader必然会造成消息的丢失。 故一般都要将该参数设置成 false即不允许这种情况的发生。
    • 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
    • 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
    • 确保 replication.factor > min.insync.replicas。如果两者相等那么只要有一个副本挂机整个分区就无法正常工作了。我们不仅要改善消息的持久性防止数据丢失还要在不降低可用性的基础上完成。 推荐设置成 replication.factor = min.insync.replicas + 1
    • 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
  • 补充

    • 设置 acks = all。表明所有副本 Broker 都要接收到消息该消息才算是“已提交”。如果所有的Broker都要收到消息才能算作已提交会不会对系统的吞吐量影响很大另外这里的副本指的是不是仅仅是ISR?
      • 碰到的实际场景影响还是很大的。acks=all时大部分的请求处理延时都花在了follower同步上。 是的acks=all表明所有ISR中的副本都要同步。

2.4 客户端都有哪些不常见但是很高级的功能?

3. Producer生产者

4. Consumer

5. Stream

6. Connect

7. Kafka集群部署与开发

8. Kafka集群监控、安全与最佳实践

9. Kafka面试点梳理