You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

275 lines
22 KiB
Markdown

# Kafka
## 1. Kafka基本概念及配置
- 官方QuickStart: https://kafka.apache.org/quickstart
- 官方文档: https://kafka.apache.org/documentation/
- 最新版本 3.2.0 【截止2022-7-20】
- 学习版本 kafka_2.11-2.4.0【前面是scala后面是kafka】 zookeeper-3.5.7【截止2022-7-20】
### 1.1 概念
- 概述
- Kafka 是分布式的流处理平台
- Kafka 是基于Zookeeper的分布式消息系统
- Kafka具有高吞吐率、高性能、实时、高可靠等特点
- Kafka 的组成概念
- Topic: 一个虚拟的概念, 由1到多个Partitions组成
- Partition: 实际消息存储单位
- Producer: 消息生产者
- Consumer: 消息消费者
-
### 1.2 安装Zookeeper(快捷)
```shell
# 修改配置文件
cp zoo_sample.cfg zoo.cfg
# 需要修改参数
dataDir=指定一个存储配置文件的目录
# 启动
./zkServer.sh start
# 连接测试
./zkCli.sh
```
### 1.3 安装Kafka
```shell
# 修改基本的配置文件
vim server.properties
# 需要修改的配置项
listeners=PLAINTEXT://[你的ip地址]:9092
advertised.listeners=PLAINTEXT://[你的ip地址]:9092
log.dirs=你的kafka的日志目录
zookeeper.connect=[你的ip地址]:2181
```
### 1.4 Kafka 的线上方案
- 操作系统
- 在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的 快速数据传输特性。
- Windows 平台上部署 Kafka 只适合于个人测试或用于功能验证, 千万不要应用于生产环境。
- 磁盘
- **普通的机械磁盘** 即可, Kafka 大量使用磁盘不假,可它使用的方式多是顺序读写操作,一定程度上规避了机械磁盘 最大的劣势,即随机读写操作慢。
- 追求性价比的公司可以**不搭建 RAID**,使用**普通磁盘组成存储空间即可**。 [Kafka 1.1开始正式支持JBOD了。再说Kafka本身在软件层面也提供了冗余的机制来对 抗磁盘损坏。]
- 磁盘容量
- 在规划磁盘容量时你需要考虑下面这几个元素:
- 新增消息数
- 消息留存时间
- 平均消息大小
- 备份数
- 是否启用压缩
- 例子:
- 假设你所在公司有个业务每天需要向 Kafka 集群发送 **1 亿条消息**,每条消息**保存两份**以防止数据丢失,另外消息默认**保存两周** 时间。现在假设消息的**平均大小是 1KB**?
- 每天 1 亿条 1KB 大小的消息,保存两份且留存两周的时间,那么总的空 间大小就等于 1 亿 * 1KB * 2 / 1000 / 1000 = 200GB。一般情况下 Kafka 集群除了消息 数据还有其他类型的数据,
- 比如索引数据等,故我们再为这些数据预留出 10% 的磁盘空 间,因此总的存储容量就是 220GB。既然要保存两周那么整体容量即为 220GB * 14 大约 3TB 左右。Kafka 支持数据的压缩,
假设压缩比是 0.75,那么最后你需要规划的存储 空间就是 0.75 * 3 = 2.25TB。
- 带宽
- 对于 Kafka 这种通过网络大量进行数据传输的框架而言,**带宽特别容易成为瓶颈**。事实 上,在我接触的真实案例当中,带宽资源不足导致 Kafka 出现性能问题的比例至少占 60% 以上。
如果你的环境中还涉及跨机房传输,那么情况可能就更糟了。
- 带宽也主要有 两种:1Gbps 的千兆网络和 10Gbps 的万兆网络,特别是千兆网络应该是一般公司网络的标准配置了。
- 与其说是带宽资源的规划,其实真正要规划的是所需的 Kafka 服务器的数量。
- 假设你公司 的机房环境是千兆网络,即 1Gbps现在你有个业务其业务目标或 SLA 是在 1 小时内处 理 1TB 的业务数据。那么问题来了,你到底需要多少台 Kafka 服务器来完成这个业务呢?
- 让我们来计算一下,由于带宽是 1Gbps即每秒处理 1Gb 的数据,假设每台 Kafka 服务 器都是安装在专属的机器上,也就是说每台 Kafka 机器上没有混布其他服务,毕竟真实环境中不建议这么做。
通常情况下你只能假设 Kafka 会用到 70% 的带宽资源,因为总要为其 他应用或进程留一些资源。根据实际使用经验,超过 70% 的阈值就有网络丢包的可能性了,故 70% 的设定是一个比 较合理的值,
也就是说单台 Kafka 服务器最多也就能使用大约 700Mb 的带宽资源。稍等,这只是它能使用的最大带宽资源,你不能让 Kafka 服务器常规性使用这么多资源, 故通常要再额外预留出 2/3 的资源,
即单台服务器使用带宽 700Mb / 3 ≈ 240Mbps。需 要提示的是,这里的 2/3 其实是相当保守的,你可以结合你自己机器的使用情况酌情减少 此值。 好了,有了 240Mbps
我们就可以计算 1 小时内处理 1TB 数据所需的服务器数量了。根 据这个目标,我们**每秒需要处理 2336Mb [1024*1024/3600*8] 的数据,除以 240约等于 10 台服务器**。
如果 消**息还需要额外复制两份,那么总的服务器台数还要乘以 3即 30 台**。
- Kafka 性能测试脚本
- kafka-producer-perf-test 脚本还不错kafka-consumer-perf-test有点难用
- 混部
- 一个机器上 部署一个zookeeper和一个kafka, 如果配置好**ZooKeeper事务日志**(比如设置好autopurge.purgeInterval及定期删除 snapshot等)它对IO的需求不大混布也是可以的。
---
- 最最最重要的集群参数配置
- Kafka 服务器端的配置,其中既有 Broker 端参数,也有主题(Topic)级别的参数、JVM 端参数和操作系统级别的参数
- Broker 端参数
- 首先 Broker 是需要配置存储信息的,即 Broker 使用哪些 磁盘。那么针对存储信息的重要参数有以下这么几个:
- **log.dirs**: 这是非常重要的参数,指定了 Broker 需要 使用的若干个文件目录路径。要知道这个参数是没有默认 值的,这说明什么?这说明它**必须由你亲自指定**。
- log.dir: 注意这是 dir结尾没有 s说明它只能表示 单个路径,它是**补充上一个参数用的**。
- 这两个参数应该怎么设置呢?很简单,你只要设置 log.dirs即第一个参数就好了不要设置log.dir。而且更重要的是在线上生产环境中一定要为log.dirs配置 多个路径,
具体格式是一个 CSV 格式,也就是用逗号分隔 的多个路径,比 如/home/kafka1,/home/kafka2,/home/kafka3这 样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。
- 这样做有两个好处:
- 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
- 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新 引入的强大功能。要知道在以前,只要 Kafka Broker 使 用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。 但是自 1.1 开始,这种情况被修正了,
坏掉的磁盘上的数 据会自动地转移到其他正常的磁盘上,而且 Broker 还能 正常工作。这个改进正是我们舍弃 RAID 方案的基 础:没有这种 Failover 的话,我们只能依靠 RAID 来提 供保障。
- 与 ZooKeeper 相关的设置
- 首先 ZooKeeper 是 做什么的呢?它是一个分布式协调框架,负责协调管理并保 存 Kafka 集群的所有元数据信息比如集群都有哪些Broker 在运行、创建了哪些 Topic每个 Topic 都有多少 分区以及这些分区的 Leader 副本都在哪些机器上等信息。
- Kafka 与 ZooKeeper 相关的最重要的参数当属 **zookeeper.connect**
- 这也是一个 CSV 格式的参数,比 如我可以指定它的值为 **zk1:2181,zk2:2181,zk3:2181**。2181 是 ZooKeeper 的默认端口。
- 如果我让**多个 Kafka 集群使用同一套 ZooKeeper 集群**,那么这个参数应该怎么设置呢?这时候 chroot 就派上用场了。这个 chroot 是 ZooKeeper 的概 念,类似于别名。
- 如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2那么两套集群的zookeeper.connect参数可以 这样指定:**zk1:2181,zk2:2181,zk3:2181/kafka1和 zk1:2181,zk2:2181,zk3:2181/kafka2**。
- **切记 chroot 只需要写一次,而且是加到最后的**。我经常碰到有人这样指 定: zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3这样的格式是不对的。
- 第三组参数是与 Broker 连接相关的,即客户端程序或其他 Broker 如何与该 Broker 进行通信的设置。有以下三个参 数:
- **listeners**: 学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。
- advertised.listeners: 和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的, 就是说**这组监听器是 Broker 用于对外发布的**。
- [内网环境访问Kafka不需要配置这个参数常见的玩法是:你的Kafka Broker机器上配置了双网卡一 块网卡用于内网访问(即我们常说的内网IP);另一个块用 于外网访问。那么你可以配置listeners为内网IP advertised.listeners为外网IP。]
- ~~host.name/port~~: 列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。
- 监听器的概念,从构成上来说,它是若干个逗 号分隔的三元组,**每个三元组的格式为<协议名称,主机 名,端口号>**。这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密 传输等;
也可能是你自己定义的协议名字,比如 CONTROLLER: //localhost:9092。
- 一旦你自己定义了协议名称,你必须还要指定 listener.security.protocol.map参数告诉这个协议底层使用了哪种安全协议比如指定 listener.security.protocol.map=CONTROLLER:P LAINTEXT表示CONTROLLER这个自定义协议底层使用明文 不加密传输数据。
- 经常有人会问主机 名这个设置中我到底使用 IP 地址还是主机名。这里我给出 统一的建议:**最好全部使用主机名**,即 **Broker 端和 Client 端应用配置中全部填写主机名**。 Broker 源代码中也使用的 是主机名,如果你在某些地方使用了 IP 地址进行连接,可能会发生无法连接的问题。
- 第四组参数是关于 Topic 管理的。我来讲讲下面这三个参数:
- auto.create.topics.enable:是否允许自动创建 Topic。
- [建议最好设置成 **false**,即不允许自动创建 Topic。每个部门被分配的 Topic 应该由运维严格把控,决不能允许自行创建任何 Topic。]
- unclean.leader.election.enable:是否允许 Unclean Leader 选举。
- [每个分区都有多个副本来提供高可用。 在这些副本中只能有一个副本对外提供服务,即所谓的 Leader 副本。建议你还是显式地把它设置成 **false** 吧]
- auto.leader.rebalance.enable:是否允许定期进 行 Leader 选举。
- [对生产环境影响非常大,设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满 足一定的条件才会发生。严格来说它与上一个参数中Leader 选举的最大不同在于,
它不是选 Leader而是换 Leader!比如 Leader A 一直表现得很好,但若 true则有可能 一段时间后 Leader A 就要被强行卸任换成 Leader B。建议你在生产环境中把这 个参数设置成 **false**。]
- 最后一组参数是数据留存方面的,即:
- log.retention.{hour|minutes|ms}:这是个“三 兄弟”,都是控制一条消息数据被保存多长时间。从优先 级上来说 ms 设置最高、minutes 次之、hour 最低。
- [虽然 ms 设置有最高的优先级,但是 通常情况下我们还是设置 hour 级别的多一些,比如 log.retention.hour=168表示默认保存 7 天的数据,自动删除 7 天前的数据。很多公司把 Kafka 当做存储来使 用,那么这个值就要相应地调大。]
- log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。
- [这个值默认是 -1 表明你想在这台 Broker 上保存多少数据都可以,至少在容 量方面 Broker 绝对为你开绿灯,不会做任何阻拦。这个参 数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集 群:设想你要做一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间]
- message.max.bytes:控制 Broker 能够接收的最大消息大小。
- [不能使用默认值的参数,默认的 1000012 太少了,还不到 1MB。实际场景中突 破 1MB 的消息都是屡见不鲜的,因此在线上环境中设置一 个比较大的值还是比较保险的做法。毕竟它只是一个标尺而 已,仅仅衡量 Broker 能够处理的最大消息大小,即使设置 大一点也不会耗费什么磁盘空间的。]
- 补充参数
- gg.handler.kafkahandler.Mode = tx gg.handler.kafkahandler.Mode = op 这两个的差别。遇到时 dml 数据会丢失的情况。用的是 op 。
- 当设置成op单个数据库表的变更(插入、更新、删除) 会被当成一条Kafka消息发送;当设置成tx时数据库事务 所做的所有变更统一被封装进一条Kafka消息并在事务提 交后被发送。后者有事务性的保障,至少有原子性方面的保证,不会丢失部分 CDC 数据。
- Topic 级别参数
- Topic 级别参数会**覆盖全局 Broker 参数的值**,而每个 Topic 都能设置自己的参数值, 这就是所谓的 Topic 级别参数。
- **retention.ms**: 规定了该 Topic 消息被保存的时长。 默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦 设置了这个值,它会覆盖掉 Broker 端的全局参数值。
- **retention.bytes**:规定了要为该 Topic 预留多大的磁 盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1表示可以 无限使用磁盘空间。
- Topic 级别 参数的设置就是这种情况,我们有两种方式可以设置:
- 创建 Topic 时进行设置
- 修改 Topic 时设置
- 如何在创建 Topic 时设置这些参数
- 你的部门需要将交易数据发送到 Kafka 进行处理,需要保 存最近半年的交易数据,同时这些数据很大,通常都有几 MB但一般不会超过 5MB。现在让我们用以下命令来创建 Topic:
- bin/kafka-topics.sh--bootstrap-serverlocalhost:9092--create--topictransaction--partitions1--replication-factor1--configretention.ms=15552000000--configmax.message.bytes=5242880
- 请注意结尾处的--config设置我们就是在 config 后面指定了想要设置的 Topic 级别参数。
- 使用另一个自带的命令kafka-configs来修改 Topic 级别参数。假设我们现在要发送最大值是 10MB 的消息,该如何修改呢?命令如下:
- bin/kafka-configs.sh--zookeeperlocalhost:2181--entity-typetopics--entity-nametransaction--alter--add-configmax.message.bytes=10485760
- [你最好始终坚持使用第二种方式来设置并且在未来Kafka 社区很有可能统一使用kafka-configs脚本来调整 Topic 级别参数。]
- JVM 参数
- Kafka 服务器端代码是用 Scala 语言编写的,但终归还是编译成 Class 文件在 JVM 上运行,因此 JVM 参数设置对于 Kafka 集群的重要性不言而喻。
- 有条件的话至少使用 Java 8 吧, [Kafka 2.0已经不支持Java 7了2.1版本开始初步支持Java 11但不建议生产环境用11所以还是使用Java 8吧。]
- 堆大小这个参数至关重要。无脑给出一个通用的建议:将你的 JVM 堆大小设置成 **6GB** 吧,这是目前业界比较公认的一个合理值。我见过很多人就是使用默认的 Heap Size 来跑 Kafka说实话默认的 1GB 有点小,
毕竟 Kafka Broker 在与客户端进行交互时会在 JVM 堆上创建大量的 ByteBuffer 实例Heap Size 不能太小。
- [没有通用的标准只有一个最佳实践值6GB。最好还是监控一下实时的堆大小特别是**GC之后的live data大小通常将heapsize设置成其1.5~2倍就足以了**]
- JVM 端配置的另一个重要参数就是垃圾回收器的设置,也就是平时常说的 GC 设置。如果你依然在使用 Java 7那么可以根据以下法则选择合适的垃圾回收器
- 如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。
- 否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。
- 当然了,如果你已经在使用 Java 8 了[G1是jdk9中默认的jdk8还是需要显式指定的]。在没有任何调优的情况下G1 表现得要比 CMS 出色,主要体现在更少的 Full GC需要调整的参数更少等所以使用 G1 就好了。
- Java8默认的新生代垃圾回收器是UseParallelGC可以用-XX:+PrintCommandLineFlags -version查看如果显示指定 -XX:+UseCurrentMarkSweepGC 的话,会默认开启 -XX:+UseParallelGC
- 确定好了要设置的 JVM 参数,我们该如何为 Kafka 进行设置呢?
- 只需要设置下面这两个环境变量即可:
- KAFKA_HEAP_OPTS指定堆大小。
- KAFKA_JVM_PERFORMANCE_OPTS指定 GC 参数。
- 比如你可以这样启动 Kafka Broker即在启动 Kafka Broker 之前,先设置上这两个环境变量:
- $> **export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g**
- $> **export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true**
- $> **bin/kafka-server-start.sh config/server.properties**
- 操作系统参数
- Kafka 并不需要设置太多的 OS 参数,但有些因素最好还是关注一下,比如下面这几个:
- 文件描述符限制
- 文件系统类型
- Swappiness
- 提交时间
- 首先是 **ulimit -n**
- 觉得任何一个 Java 项目最好都调整下这个值。实际上,文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如 **ulimit -n 1000000**
- 但不设置的话后果很严重比如你会经常看到“Too many open files”的错误。
- 其次是文件系统类型的选择。
- 这里所说的文件系统指的是如 ext3、ext4 或 XFS 这样的日志型文件系统。根据官网的测试报告,**XFS** 的性能要强于 ext4所以生产环境最好还是使用 XFS。对了最近有个 Kafka 使用 **ZFS** 的数据报告,貌似性能更加强劲,有条件的话不妨一试。
- 第三是 swap 的调优
- 网上很多文章都提到设置其为 0将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。我个人反倒觉得还是不要设置成 0 比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成 0当物理内存耗尽时操作系统会触发 OOM killer 这个组件,
它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,
我**个人建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1**。
### 1.5 kafka的基本操作
```shell
# 1、启动Kafka - todo 这个命令会让控制台有日志生成不太方便
bin/kafka-server-start.sh config/server.properties &
# 2、停止Kafka
bin/kafka-server-stop.sh
# 3、创建Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic jiangzh-topic
# 4、查看已经创建的Topic信息
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 5、发送消息
bin/kafka-console-producer.sh --broker-list 192.168.220.128:9092 --topic jiangzh-topic
# 6、接收消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.220.128:9092 --topic jiangzh-topic --from-beginning
# {"orderId":"002","price":"80"}
```
```shell
# 【旧版】
kafka-topics.sh --zookeeper 172.16.26.183:2181 --list
# 【新版】
kafka-topics.sh --bootstrap-server 172.16.26.183:9092 --list
# zookeeper is not a recognized option主要原因是 Kafka 版本过高,命令不存在)
# 创建topic主题
kafka-topics.sh --bootstrap-server 172.16.26.183:9092 --create --topic topic1 --partitions 1 --replication-factor 3
# --create 命令后 --topic 为创建topic 并指定 topic name
# --partitions 为指定分区数量
# --replication-factor 为指定副本集数量
# 向kafka集群发送数据
# 【无key型消息】
kafka-console-producer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1
# 【有key型消息】
kafka-console-producer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 --property parse.key=true
# 默认消息键与消息值间使用“Tab键”进行分隔切勿使用转义字符(\t)
# kafka命令接受数据
kafka-console-consumer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 --from-beginning
# kafka查看消费进度当我们需要查看一个消费者组的消费进度时则使用下面的命令(启动Consumer的时候才会真的生效)【这条命令对查询消费特别重要】
kafka-consumer-groups.sh --bootstrap-server 172.16.26.183:9092 --describe --group group1
```
## 2. Kafka客户端操作
## 3. Producer生产者
## 4. Consumer
## 5. Stream
## 6. Connect
## 7. Kafka集群部署与开发
## 8. Kafka集群监控、安全与最佳实践
## 9. Kafka面试点梳理