# RocketMQ ## 1. RocketMQ 基础 - 官方QuickStart: https://github.com/apache/rocketmq/tree/master/docs/cn - 官方文档: https://rocketmq.apache.org/docs/ - 最新版本 4.9.4 【截止2022-7-20】 - 学习版本 4.3.0 【截止2022-7-20】 ### 1.1 整体介绍 - RocketMQ是一款分布式、队列模型的消息中间件 - 特性: - 支持分布式事物 - 支持集群模型、负载均衡、水平扩展能力 - 亿级别的消息堆积能力 - 采用零拷贝的原理、顺序写盘、随机读 - 丰富的API封装 - 代码优秀,底层通信框架采用Netty NIO框架 - NameServer代替Zookeeper - 集群无单点,可扩展,任意一点高可用,水平可扩展 - 消息失败重试机制,消息可查询 - 开源社区活跃、成熟度(经过双十一考验) - 理解的问题: - MQ如何集群化部署来支撑高并发访问? - MQ如果要存储海量消息应该怎么做? - 高可用保障: 万一Broker宕机了怎么办? - 数据路由: 怎么知道访问哪个Broker? ### 1.2 概念模型 - Producer: 消息生产者,负责生产消息,一般由业务系统负责产生消息 --- - Consumer: 消息消费者,负责消费消息,一般是后台系统负责异步消费 - Push Consumer: Consumer的一种, 需要向Consumer对象注册监听 - Pull Consumer: Consumer的一种, 需要主动请求Broker拉取消息 --- - Producer Group: 生产者集合, 一般用于发送一类消息 - Consumer Group: 消费者集合, 一般用于接收一类消息进行消费 --- - Broker: MQ消息服务(中转角色, 用于消息存储与生产消费转发) - Message: TODO ### 1.3 源码工程讲解 - 编译完成的文件在 rocket-distribution/target/ 下 - RocketMQ的源码结构 - rocketmq-broker: 主要的业务逻辑, 消息收发, 主从同步, pagecache - rocketmq-client: 客户端接口, 比如生产者和消费者 - rocketmq-example: 示例, 比如生产者和消费者 - rocketmq-common: 公用数据结构等等 - rocketmq-distribution: 编译模块, 编译输出等 - rocketmq-filter: 进行Broker过滤的不感兴趣的消息传输,减小带宽压力 - rocketmq-logappender, rocketmq-logging 日志相关 - rocketmq-namesrv Namesrv服务, 用于服务协调 - rocketmq-openmessaging 对外提供服务 - rocketmq-remoting 远程调用接口, 封装Netty底层通信 - rocketmq-util 提供一些公用的工具方法, 比如解析命令行参数 - rocketmq-store 消息存储 - rocketmq-test - rocketmq-tools 管理工具, 比如有名的mqadmin工具 ### 1.4 环境搭建 - 参考 RocketMQ使用手册.pdf - ps: 内存大小至少也要1GB - ps: 先启动NameSrv 再启动Broker - ps: 测试使用1个配置文件即可 ### 1.5 RocketMQ 控制台使用 - RocketMQ 扩展组件: https://github.com/apache/rocketmq-externals - 可以集成控制台,修改控制台即可 - 集成SpringBoot也在这里 ## 2. RocketMQ 基本入门 ### 2.1 生产者的使用及使用控制台查消息 1. 创建生产者对象 DefaultMQProducer (必须要对生产者组的名称进行配置) 2. 设置 NamesrvAddr 3. 设置一些需要配置的参数 4. 启动生产者服务 5. 创建消息并发送 ### 2.2 消费者的使用及Broker重试机制 1. 创建消费者对象 DefaultMQPushConsumer 2. 设置NamesrvAddr及其消费位置ConsumerFromWhere 3. 进行订阅主题 subscribe 4. 注册监听并消费 registerMessageListener ### 2.3 RocketMQ - 集群环境构建详解 - 单点模式 - 主从模式 - 双主模式 - 双主双从,多主多从模式(推荐) - 主从模式环境构建可以保障消息的即时性与可靠性 - 投递一条消息后, 关闭主节点, 从节点继续可以提供消费者数据进行消费, 但是不能接收消息, 主节点上线后进行消费进行offset同步 - 集群的关闭和启动 参考手册-数据清理: RocketMQ使用手册.pdf - 集群配置 参考手册-集群配置: RocketMQ使用手册.pdf ## 3. RocketMQ 生产者核心研究 ### 3.1 RocketMQ生产者-核心参数讲解 - producerGroup: 组名 - createTopicKey: 创建Topic, 正式开发不能由开发组进行指定的, 对createTopicKey进行封装,不对外进行使用 - defaultTopicQueueNums (默认为4) - sendMsgTimeout(单位: ms) - compressMsgBodyOverHowmuch(默认压缩字节4096,默认有消息的压缩,可以提升发送性能) - retryTimesWhenSendFailed(有同步的重发策略和同名的异步重发策略) - retryAnotherBrokerWhenNotStoreOK(默认为false, 没存储成功使用其他的Broker进行存储) - maxMessageSize(默认是128k) - ... ### 3.2 主从同步机制分析 - 同步大致概念 - 同步信息: 数据内容 + 元数据信息 - 元数据同步: Broker角色识别, 为Slave则启动同步任务 - 消息同步: HAService, HAConnection, WaitNotifyObject - 同步源码设计 - 首先明确同步元数据肯定是在Broker中,且肯定是执行定时任务。 从Broker中进入 org.apache.rocketmq.broker.BrokerController 387行 - ```java ... // 如果角色是从节点 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { ... // 在固定的时间执行定时任务 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { ... // 启动定时任务进行同步元数据信息 BrokerController.this.slaveSynchronize.syncAll(); ``` - 具体的同步逻辑看下 SlaveSynchronize 下的 syncAll()方法 - ```java public void syncAll() { // 同步 Topic 配置信息 this.syncTopicConfig(); // 同步消费者偏移量 this.syncConsumerOffset(); // 同步延迟偏移量 this.syncDelayOffset(); // 同步订阅组配置信息 this.syncSubscriptionGroupConfig(); } ``` - 查看 syncTopicConfig() 的同步逻辑 - ```java ... TopicConfigSerializeWrapper topicWrapper = this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); ... // getAllTopicConfig(masterAddrBak) -> this.remotingClient.invokeSync(...) ... // 进行同步调用 涉及到netty remoting模块的代码 NettyRemotingClient invokeSync(...) // 算出极限耗时代价 long costTime = System.currentTimeMillis() - beginStartTime; // 如果同步等待的时长 < 极限耗时 说明请求不成立 直接抛出 RemotingTimeoutException if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } // 同步请求 kernel RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); ... ``` - invokeSyncImpl(...),真实的同步请求的实现 - ```java ... public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { // opaque 相当于request ID, 每发送一次请求request,创建一个RemotingCommand实例, final int opaque = request.getOpaque(); try { // 根据opaque(request ID)构建 ResponseFuture final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); // 将 ResponseFuture 放入 responseTable this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); // 使用 netty channel 来发送请求 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { // 轮询监听 @Override public void operationComplete(ChannelFuture f) throws Exception { // OK setSendRequestOK 为 true if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { // 否则 setSendRequestOK 为 false responseFuture.setSendRequestOK(false); } // 异常情况 // 设置 Map 快速清除 responseTable.remove(opaque); // 设置异常 responseFuture.setCause(f.cause()); // 赋值为NULL responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } } ... ``` - 总结: RocketMQ Topic同步 底层实际是使用 Netty 做的同步通信 - TODO 其他的同步可以后续进行补充 --- - 数据同步逻辑在 org.apache.rocketmq.store.ha 下的 HAConnection HAService WaitNotifyObject 中 - HAConnection 主要是做数据的读写操作的 - 主要逻辑 - Master节点: - AcceptSocketService: 接收Slave节点连接 - HAConnection - ReadSocketService: 读来自Slave节点的数据 - WriteSocketService: 写往Slave节点的数据 - Slave节点: - HAService - HAClient:对Master节点连接, 读写请求 - 通信协议: Master节点与Slave节点 通信协议都很简单, 只有如下两条 - Slave -> Master : 上报CommitLog已经同步到的物理位置 - Master -> Slave : 传输新的CommitLog数据 - TODO 后续自己补充源码 ### 3.3 RocketMQ同步消息发送和异步发送消息 - 同步发送: producer.send(msg) - 同步发送消息核心实现: DefaultMQProducerImpl - 异步发送: producer.send(Message msg, SendCallback sendCallback) - 异步发送消息核心实现: DefaultMQProducerImpl ### 3.4 Netty底层框架解析 - 核心类 RemotingService RemotingClient RemotingServer NettyRemotingClient NettyRemotingServer NettyRemotingAbstract - 协议 消息长度+序列化类型&&头部长度+消息头数据+消息主体数据 ### 3.5 RocketMQ生产者-消息返回状态详解 - SEND_OK - FLUSH_DISK_TIMEOUT 要做补偿和可靠性设计 - FLUSH_SLAVE_TIMEOUT 要做补偿和可靠性设计 - SLAVE_NOT_AVAILABLE 要做补偿和可靠性设计 ### 3.7 RocketMQ生产者-延迟消息 - 延迟消息: 消息发到Broker后, 要特定的时间才会被Consumer消费 - 目前只支持固定精度的定时消息[因为对消息的顺序有一定的要求,当我们不固定精度,会加大Broker的性能开销] - MessageStoreConfig 配置类& ScheduleMessageService 任务类 - setDelayTimeLevel 方法设置 ### 3.8 RocketMQ生产者-自定义消息发送策略 - 如何把消息发送到指定队列(Message Queue) - MessageQueueSelector - producer.send(Msg, selector, Obj) ## 4. RocketMQ 消费者核心研究 ## 5. RocketMQ 核心原理研究 ## 6. 双主双从部署 ### 6.1 设计生产的架构 - NameServer集群化部署, 保证高可用, 每台NameServer实际上都会有完整的集群路由信息,包括所有的Broker节点信息,我们的数据信息 - 基于Dledger的Broker主从架构部署 - RocketMQ 4.5以前的那种普通的Master-Slave架构来部署,能在一定程度上保证数据不丢失,也能保证一定的可用性 - 但是那种方式的缺陷是很明显的,最大的问题就是当Master Broker挂了之后,没办法让Slave Broker自动切换为新的Master Broker,需要手工做一些运维操作,修改配置以及重启机器才行,这个非常麻烦。 - Dledger技术是要求至少得是一个Master带两个Slave,这样有三个Broke组成一个Group,也就是作为一个分组来运行。一旦 Master宕机,他就可以从剩余的两个Slave中选举出来一个新的Master对外提供服务。 - Broker是如何跟NameServer进行通信的 - Broker会每隔30秒发送心跳到所有的NameServer 上去,然后每个NameServer都会每隔10s检查一次有没有哪个Broker超过120s没发送心跳的,如果有,就认为那个Broker已经宕机 了,从路由信息里要摘除这个Broker。 - RocketMQ的实现中,采用的是TCP长连接进行通信。 - Broker会跟每个NameServer都建立一个TCP长连接,然后定时通过TCP长连接发送心跳请求过去 - 使用MQ的系统都要多机器集群部署 - 无论作为生产者还是消费者的系统,都应 该多机器集群化部署,保证他自己本身作为生产者或者消费者的高可用性 - MQ的核心数据模型:Topic到底是什么? - 就是一个数据集合的意思 - 系统如果要往MQ里写入消息或者获取消息,首先得创建一些Topic,作为数据集合存放不同类型的消息 - Topic作为一个数据集合是怎么在Broker集群里存储的 - 我们可以在创建Topic的时候指定让他里面的数据分散存储在多台Broker机器上,比如一个Topic里有1000万条数据,此时有2台 Broker,那么就可以让每台Broker上都放500万条数据 - 生产者系统是如何将消息发送给Broker的? - 生产者一定是投递消息到Master Broker的,然后Master Broker会同步数据给他的Slave Brokers,实现 一份数据多份副本,保证Master故障的时候数据不丢失,而且可以自动把Slave切换为Master提供服务。 - 消费者是如何从Broker上拉取消息的? - 消费者系统可能会从Master Broker拉取消息,也可能从Slave Broker拉取消息,都有可能,一切都看具体 情况。 ### 6.2 部署机器配置 - NameServer:3台机器,每台机器都是8核CPU + 16G内存 + 500G磁盘 + 千兆网卡 - Broker:3台机器,每台机器都是24核CPU(两颗x86_64 cpu,每颗cpu是12核) + 48G内存 + 1TB磁盘 + 千兆网卡 - 生产者:2台机器,每台机器都是4核CPU + 8G内存 + 500GB磁盘 + 千兆网卡 - 消费者:2台机器,每台机器都是4核CPU + 8G内存 + 500GB磁盘 + 千兆网卡 - NameServer是核心的路由服务,所以给8核16G的较高配置的机器,但是他一般就是承载Broker注册和心跳、系统的路由表拉取等请 求,负载其实很低,因此不需要特别高的机器配置,部署三台也可以实现高可用的效果了。 - Broker是最负载最高的,未来要承载高并发写入和海量数据存储,所以把最高配置的机器都会留给他,这里用3台机器组成一个“单 Master + 双Slave”的集群。 - 生产者和消费者机器都是临时用来测试的,而且一般他们都是业务系统,只会部署在标准的4核8G的机器配置下。 ### 6.3 参数调整 - PS: 中间件系统在压测或者上生产之前,需要对三大块参数进行调整:OS内核参数、JVM参数以及中间件核心参数 - PS: OS内核参数主要调整的地方都是跟磁盘IO、网络通信、内存管理以及线程管理有关的,需要适当调节大小 - PS: JVM参数需要我们去中间件系统的启动脚本中寻找他的默认JVM参数,然后根据机器的情况,对JVM的堆内存大小,新生代大 小,Direct Buffer大小,等等,做出一些调整,发挥机器的资源 - PS: 中间件核心参数主要也是关注其中跟网络通信、磁盘IO、线程数量、内存 管理相关的,根据机器资源,适当可以增加网络通信线程,控制同步刷磁盘或者异步刷磁盘,线程数量有多少,内存中一些队列的大小 --- #### A. 对RocketMQ集群进行OS内核参数的调整 (1) vm.overcommit_memory - “vm.overcommit_memory”这个参数有三个值可以选择,0、1、2。 - 0: 如果值是0的话,在你的中间件系统申请内存的时候,os内核会检查可用内存是否足够,如果足够的话就分配内存给你,如果感觉剩余 内存不是太够了,干脆就拒绝你的申请,导致你申请内存失败,进而导致中间件系统异常出错。
- 1: 一般需要将这个参数的值调整为1,意思是把所有可用的物理内存都允许分配给你,只要有内存就给你来用,这样可以避免申请内存失败的问题。 > 可以用如下命令修改: echo 'vm.overcommit_memory=1' >> /etc/sysctl.conf (2) vm.max_map_count - 这个参数的值会影响中间件系统可以开启的线程的数量,同样也是非常重要的 - 如果这个参数过小,有的时候可能会导致有些中间件无法开启足够的线程,进而导致报错,甚至中间件系统挂掉。 - 他的默认值是65536,但是这个值有时候是不够的,比如大数据团队的生产环境部署的Kafka集群曾经有一次就报出过这个异常,说无法开启足够多的线程,直接导致Kafka宕机了。 - 因此建议可以把这个参数调大10倍,比如655360这样的值,保证中间件可以开启足够多的线程。 > 可以用如下命令修改:echo 'vm.max_map_count=655360' >> /etc/sysctl.conf (3) vm.swappiness - 这个参数是用来控制进程的swap行为的,这个简单来说就是os会把一部分磁盘空间作为swap区域,然后如果有的进程现在可能不是太 活跃, 就会被操作系统把进程调整为睡眠状态,把进程中的数据放入磁盘上的swap区域,然后让这个进程把原来占用的内存空间腾出 来,交给其他活跃运行的进程来使用。 - 如果这个参数的值设置为0,意思就是尽量别把任何一个进程放到磁盘swap区域去,尽量大家都用物理内存。 - 如果这个参数的值是100,那么意思就是尽量把一些进程给放到磁盘swap区域去,内存腾出来给活跃的进程使用。 - 默认这个参数的值是60,有点偏高了,可能会导致我们的中间件运行不活跃的时候被迫腾出内存空间然后放磁盘swap区域去。 - 因此通常在生产环境建议把这个参数调整小一些,比如设置为10,尽量用物理内存,别放磁盘swap区域去。 > 可以用如下命令修改:echo 'vm.swappiness=10' >> /etc/sysctl.conf (4) ulimit - 这个是用来控制linux上的最大文件链接数的,默认值可能是1024,一般肯定是不够的,因为你在大量频繁的读写磁盘文件的时候,或 者是进行网络通信的时候,都会跟这个参数有关系 - 对于一个中间件系统而言肯定是不能使用默认值的,如果你采用默认值,很可能在线上会出现如下错误:error: too many open files > 因此通常建议用如下命令修改这个值:echo 'ulimit -n 1000000' >> /etc/profile (5) 总结: - 要调整的东西,无非都是跟磁盘文件IO、网络通信、内存管理、线程数量有关系的, 因为我们的中间件系统在运行的时候无非就是跟这些打交道 - 中间件系统肯定要开启大量的线程(跟vm.max_map_count有关) - 而且要进行大量的网络通信和磁盘IO(跟ulimit有关) - 然后大量的使用内存(跟vm.swappiness和vm.overcommit_memory有关) - 所以对OS内核参数的调整,往往也就是围绕跟中间件系统运行最相关的一些东西 #### B. 对JVM参数进行调整 - 在rocketmq/distribution/target/apache-rocketmq/bin目录下,就有对应的启动脚本,比如: - mqbroker是用来启动Broker的 - mqnamesvr是用来启动NameServer的 - 用mqbroker来举例,我们查看这个脚本里的内容,最后有如下一行: - sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@ - 这一行内容就是用runbroker.sh脚本来启动一个JVM进程,JVM进程刚开始执行的main类就是 org.apache.rocketmq.broker.BrokerStartup - runbroker.sh脚本,在里面可以看到如下内容: ```shell JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g" JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 - XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0" JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps - XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy" JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch" JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g" JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking" JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib" #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n" JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}" ``` - 在上面的内容中,其实就是在为启动Broker设置对应的JVM参数和其他一些参数 - PS: 狸猫技术窝的《从0开始带你成为JVM实战高手》这个专栏, 可以去学习MQ相关的内容 --- - **[了解]** -server:这个参数就是说用服务器模式启动,这个没什么可说的,现在一般都是如此 - **[重点调整]** -Xms8g -Xmx8g -Xmn4g:这个就是很关键的一块参数了,也是重点需要调整的,就是默认的堆大小是8g内存,新生代是4g内存, 如果我们的高配物理机是48g内存的 - 所以这里完全可以给他们翻几倍,比如给堆内存20g,其中新生代给10g,甚至可以更多一些,当然要留一些内存给操作系统来用,但是要注意最小不能低于1g - **[重点调整]** -XX:+UseG1GC -XX:G1HeapRegionSize=16m:这几个参数也是至关重要的,这是选用了G1垃圾回收器来做分代回收,对新生代 和老年代都是用G1来回收 - 这里把G1的region大小设置为了16m,这个因为机器内存比较多,所以region大小可以调大一些给到16m,不然用2m的region,会导致region数量过多的 - **[了解]** -XX:G1ReservePercent=25:这个参数是说,在G1管理的老年代里预留25%的空闲内存,保证新生代对象晋升到老年代的时候有足 够空间,避免老年代内存都满了,新生代有对象要进入老年代没有充足内存了 - 默认值是10%,略微偏少,这里RocketMQ给调大了一些 - **[了解]** -XX:InitiatingHeapOccupancyPercent=30:这个参数是说,当堆内存的使用率达到30%之后就会自动启动G1的并发垃圾回收,开始尝试回收一些垃圾对象 - 默认值是45%,这里调低了一些,也就是提高了GC的频率,但是避免了垃圾对象过多,一次垃圾回收耗时过长的问题 - **[重点调整]** -XX:SoftRefLRUPolicyMSPerMB=0:这个参数默认设置为0了 - 在JVM优化专栏中,救火队队长讲过这个参数引发的案例,其实建议这个参数不要设置为0,避免频繁回收一些软引用的Class对象,这里可以调整为比如1000 - **[了解]** -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps - XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy -XX:+UseGCLogFileRotation - XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m:这一堆参数都是控制GC日志打印输出的,确定了gc日志文件的地址,要 打印哪些详细信息,然后控制每个gc日志文件的大小是30m,最多保留5个gc日志文件。 - **[了解]** -XX:-OmitStackTraceInFastThrow:这个参数是说,有时候JVM会抛弃一些异常堆栈信息,因此这个参数设置之后,就是禁用这个特性,要把完整的异常堆栈信息打印出来 - **[了解]** -XX:+AlwaysPreTouch:这个参数的意思是我们刚开始指定JVM用多少内存,不会真正分配给他,会在实际需要使用的时候再分配给 他 - 所以使用这个参数之后,就是强制让JVM启动的时候直接分配我们指定的内存,不要等到使用内存的时候再分配 - **[可以调整]** -XX:MaxDirectMemorySize=15g:这是说RocketMQ里大量用了NIO中的direct buffer,这里限定了direct buffer最多申请多少, 如果你机器内存比较大,可以适当调大这个值,如果有朋友不了解direct buffer是什么,可以自己查阅一些资料。 - **[了解]** -XX:-UseLargePages -XX:-UseBiasedLocking:这两个参数的意思是禁用大内存页和偏向锁,这两个参数对应的概念每个要说清楚 都得一篇文章,所以这里大家直接知道人家禁用了两个特性即可。 --- - 总结: - RocketMQ默认的JVM参数是采用了G1垃圾回收器,默认堆内存大小是8G - 这个其实完全可以根据大家的机器内存来调整,你可以增大一些也是没有问题的,然后就是一些G1的垃圾回收的行为参数做了调整, 这个一般我们不用去动,然后就是对GC日志打印做了设置,这个一般也不用动。 - 其余的就是禁用一些特性,开启一些特性,这些都直接维持RocketMQ的默认值即可 #### C. 对RocketMQ核心参数进行调整 - 在下面的目录里有dledger的示例配置文件:rocketmq/distribution/target/apache- rocketmq/conf/dledger - 在这里主要是有一个较为核心的参数:sendMessageThreadPoolNums=16 - 这个参数的意思就是RocketMQ内部用来发送消息的线程池的线程数量,默认是16 - 其实这个参数可以根据你的机器的CPU核数进行适当增加,比如机器CPU是24核的,可以增加这个线程数量到24或者30,都是可以 的 ### 6.4 压测 - 我们在压测的时候一方面要关注RocketMQ能抗下多少TPS,一方面还要关注RocketMQ部署的几台机器的资源使用率和负载情况 - 我们平时做压测,主要关注的还是要压测出来一个最合适的最高负载。 - 意思就是在RocketMQ的TPS和机器的资源使用率和负载之间取得一个平衡。 - 以下压测过程以及压测结果,都是根据我们之前真实的RocketMQ压测报告总结而来,非常的有代表性,大家完全可以结合我们之前说的机器配置来参考一下 #### (1) RocketMQ的TPS和消息延时 - 我们让两个Producer不停的往RocketMQ集群发送消息,每个Producer所在机器启动了**80个线程**,相当于每台机器有80个线程并发的往RocketMQ集群写入消息。 - RocketMQ集群是**1主2从**组成的一个dledger模式的高可用集群,只有一个Master Broker会接收消息的写入。有2个Cosumer不停的从RocketMQ集群消费数据。[只主节点会接到数据的成功写入] - **每条数据的大小是500个字节**,这个非常关键,大家一定要牢记这个数字,因为这个数字是跟后续的**网卡流量**有关的。 - 一条消息从Producer生产出来到经过RocketMQ的Broker存储下来,再到被Consumer消费,基本上这个**时间跨度不会超过1秒钟**,这些这个性能是正常而且可以接受的。 - 同时在RocketMQ的管理工作台中可以看到,Master Broker的TPS(也就是每秒处理消息的数量),可以稳定的达到7万左右,也就是 每秒可以稳定处理7万消息。 #### (2) cpu负载情况 - 检查一下Broker机器上的CPU负载,可以通过top、uptime等命令来查看 - 执行top命令就可以看到cpu load和cpu使用率,这就代表了cpu的负载情况 - 在你执行了top命令之后,往往可以看到如下一行信息: - load average:12.03,12.05,12.08 - 类似上面那行信息代表的是cpu在1分钟、5分钟和15分钟内的cpu负载情况 - 比如我们一台机器是24核的,那么上面的12意思就是有12个核在使用中。换言之就是还有12个核其实还没使用,cpu还是有很大余力的。 - 这个cpu负载其实是比较好的,因为并没有让cpu负载达到极限。 #### (3) 内存使用率 - 使用free命令就可以查看到内存的使用率 - 根据当时的测试结果,机器上48G的内存,仅仅使用了一部分,还剩下很大一部分内存都是 空闲可用的,或者是被RocketMQ用来进行磁盘数据缓存了 - 所以内存负载是很低的 #### (4) JVM GC频率 - 使用jstat命令就可以查看RocketMQ的JVM的GC频率,基本上新生代每隔几十秒会垃圾回收一次,每次回收过后存活的对象很少,几乎不进入老年代 #### (5) 磁盘IO负载 - 首先可以用top命令查看一下IO等待占用CPU时间的百分比,你执行top命令之后,会看到一行类似下面的东西: - Cpu(s): 0.3% us, 0.3% sy, 0.0% ni, 76.7% id, 13.2% wa, 0.0% hi, 0.0% si - 在这里的13.2% wa,说的就是磁盘IO等待在CPU执行时间中的百分比 - 如果这个比例太高,说明CPU执行的时候大部分时间都在等待执行IO,也就说明IO负载很高,导致大量的IO等待 - 这个当时我们压测的时候,是在40%左右,说明IO等待时间占用CPU执行时间的比例在40%左右,这是相对高一些,但还是可以接受的,只不过如果继续让这个比例提高上去, 就很不靠谱了,因为说明磁盘IO负载可能过高了。 #### (6) 网卡流量 - 使用如下命令可以查看服务器的网卡流量: - sar -n DEV 1 2 - 通过这个命令就可以看到每秒钟网卡读写数据量了。当时我们的服务器使用的是千兆网卡,千兆网卡的理论上限是每秒传输128M数 据,但是一般实际最大值是每秒传输100M数据。 - 因此当时我们发现的一个问题就是,在RocketMQ处理到每秒7万消息的时候,每条消息500字节左右的大小的情况下,每秒网卡传输 数据量已经达到100M了,就是已经达到了网卡的一个极限值了。 - 因为一个Master Broker服务器,每秒不光是通过网络接收你写入的数据,还要把数据同步给两个Slave Broker,还有别的一些网络通 信开销。 - 因此实际压测发现,每条消息500字节,每秒7万消息的时候,服务器的网卡就几乎打满了,无法承载更多的消息了。 #### (7) 针对压测的一点小总结 - 通过本次压测做一些总结: - 当我们使用平均大小为500字节的消息时,最多就是做到RocketMQ单台服务器每秒7万的TPS,而且这个时候cpu负载、内存负 载、jvm gc负载、磁盘io负载,基本都还在正常范围内 - 只不过这个时候网卡流量基本已经打满了,无法再提升TPS了 - 因此在这样的一个机器配置下,RocketMQ一个比较靠谱的TPS就是7万左右 #### 总结 - 到底应该如何压测: - 应该在TPS和机器的cpu负载、内存使用率、jvm gc频率、磁盘io负载、网络流量负载之间取得一个平衡,尽量让 TPS尽可能的提高,同时让机器的各项资源负载不要太高。 - 实际压测过程: - 采用几台机器开启大量线程并发读写消息,然后观察TPS、cpu load(使用top命令)、内存使用率(使用free命 令)、jvm gc频率(使用jstat命令)、磁盘io负载(使用top命令)、网卡流量负载(使用sar命令),不断增加机器和线程,让TPS不 断提升上去,同时观察各项资源负载是否过高。 - 生产集群规划: - 根据公司的后台整体QPS来定,稍微多冗余部署一些机器即可,实际部署生产环境的集群时,使用高配置物理机,同时 合理调整os内核参数、jvm参数、中间件核心参数,如此即可 ## 7. 购物车、订单与支付场景抗压实战 -> 可以移步到 ## 8. RocketMQ分布式事务消息 ### 8.1 生产案例:从 RocketMQ 全链路分析一下为什么用户支付后没有收到红包? - 用户支付之后红包到底为什么没发送出去呢? - 其实原因有多种可能,比如订单系统推送消息到MQ就失败了,压根儿就没推送过去; - 或者是消息确实推送到MQ了,但是结果MQ自己机器故障,把消息搞丢了; - 或者是红包系统拿到了消息,但是他把消息搞丢了,结果红包还没来得及发。 - 如果真的在生产环境里要搞明白这个问题,就必须要打更多的日志去一点点分析消息到底是在哪个环节丢失了? - 如果订单系统推送了消息,结果红包系统连消息都没收到,那可能消息根本就没发到MQ去,或者MQ自己搞丢了消息。 - 如果红包系统收到了消息,结果红包没派发,那么就是红包系统搞丢了消息。 ## 9. RocketMQ顺序消费与微服务解耦 ## 10. 数据过滤与性能提升