You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
土豆兄弟 340580bd52 更新了文档1 2 years ago
..
rocket-api fix commit error 2 years ago
springboot-rocketmq [新增功能](master): springboot-rocketmq整合 2 years ago
.gitattributes [新增功能](master): git配置 2 years ago
README.md 更新了文档1 2 years ago
RocketMQ使用手册.pdf [新增功能](master): 文档更新 2 years ago
RocketMQ用户指南v3.2.4.pdf [新增功能](master): 文档更新 2 years ago

README.md

RocketMQ

1. RocketMQ 基础

1.1 整体介绍

  • RocketMQ是一款分布式、队列模型的消息中间件
  • 特性:
    • 支持分布式事物
    • 支持集群模型、负载均衡、水平扩展能力
    • 亿级别的消息堆积能力
    • 采用零拷贝的原理、顺序写盘、随机读
    • 丰富的API封装
    • 代码优秀底层通信框架采用Netty NIO框架
    • NameServer代替Zookeeper
    • 集群无单点,可扩展,任意一点高可用,水平可扩展
    • 消息失败重试机制,消息可查询
    • 开源社区活跃、成熟度(经过双十一考验)
  • 理解的问题:
    • MQ如何集群化部署来支撑高并发访问?
    • MQ如果要存储海量消息应该怎么做?
    • 高可用保障: 万一Broker宕机了怎么办?
    • 数据路由: 怎么知道访问哪个Broker?

1.2 概念模型

  • Producer: 消息生产者,负责生产消息,一般由业务系统负责产生消息

  • Consumer: 消息消费者,负责消费消息,一般是后台系统负责异步消费
    • Push Consumer: Consumer的一种, 需要向Consumer对象注册监听
    • Pull Consumer: Consumer的一种, 需要主动请求Broker拉取消息

  • Producer Group: 生产者集合, 一般用于发送一类消息
  • Consumer Group: 消费者集合, 一般用于接收一类消息进行消费

  • Broker: MQ消息服务(中转角色, 用于消息存储与生产消费转发)
  • Message: TODO

1.3 源码工程讲解

  • 编译完成的文件在 rocket-distribution/target/ 下
  • RocketMQ的源码结构
    • rocketmq-broker: 主要的业务逻辑, 消息收发, 主从同步, pagecache
    • rocketmq-client: 客户端接口, 比如生产者和消费者
    • rocketmq-example: 示例, 比如生产者和消费者
    • rocketmq-common: 公用数据结构等等
    • rocketmq-distribution: 编译模块, 编译输出等
    • rocketmq-filter: 进行Broker过滤的不感兴趣的消息传输,减小带宽压力
    • rocketmq-logappender, rocketmq-logging 日志相关
    • rocketmq-namesrv Namesrv服务, 用于服务协调
    • rocketmq-openmessaging 对外提供服务
    • rocketmq-remoting 远程调用接口, 封装Netty底层通信
    • rocketmq-util 提供一些公用的工具方法, 比如解析命令行参数
    • rocketmq-store 消息存储
    • rocketmq-test
    • rocketmq-tools 管理工具, 比如有名的mqadmin工具

1.4 环境搭建

  • 参考 RocketMQ使用手册.pdf
  • ps: 内存大小至少也要1GB
  • ps: 先启动NameSrv 再启动Broker
  • ps: 测试使用1个配置文件即可

1.5 RocketMQ 控制台使用

2. RocketMQ 基本入门

2.1 生产者的使用及使用控制台查消息

  1. 创建生产者对象 DefaultMQProducer (必须要对生产者组的名称进行配置)
  2. 设置 NamesrvAddr
  3. 设置一些需要配置的参数
  4. 启动生产者服务
  5. 创建消息并发送

2.2 消费者的使用及Broker重试机制

  1. 创建消费者对象 DefaultMQPushConsumer
  2. 设置NamesrvAddr及其消费位置ConsumerFromWhere
  3. 进行订阅主题 subscribe
  4. 注册监听并消费 registerMessageListener

2.3 RocketMQ - 集群环境构建详解

  • 单点模式

  • 主从模式

  • 双主模式

  • 双主双从,多主多从模式(推荐)

    • 主从模式环境构建可以保障消息的即时性与可靠性
    • 投递一条消息后, 关闭主节点, 从节点继续可以提供消费者数据进行消费, 但是不能接收消息, 主节点上线后进行消费进行offset同步
  • 集群的关闭和启动

    参考手册-数据清理: RocketMQ使用手册.pdf

  • 集群配置 参考手册-集群配置: RocketMQ使用手册.pdf

3. RocketMQ 生产者核心研究

3.1 RocketMQ生产者-核心参数讲解

  • producerGroup: 组名
  • createTopicKey: 创建Topic, 正式开发不能由开发组进行指定的, 对createTopicKey进行封装,不对外进行使用
  • defaultTopicQueueNums (默认为4)
  • sendMsgTimeout(单位: ms)
  • compressMsgBodyOverHowmuch(默认压缩字节4096,默认有消息的压缩,可以提升发送性能)
  • retryTimesWhenSendFailed(有同步的重发策略和同名的异步重发策略)
  • retryAnotherBrokerWhenNotStoreOK(默认为false, 没存储成功使用其他的Broker进行存储)
  • maxMessageSize(默认是128k)
  • ...

3.2 主从同步机制分析

  • 同步大致概念
    • 同步信息: 数据内容 + 元数据信息
    • 元数据同步: Broker角色识别, 为Slave则启动同步任务
    • 消息同步: HAService, HAConnection, WaitNotifyObject
  • 同步源码设计
  • 首先明确同步元数据肯定是在Broker中,且肯定是执行定时任务。 从Broker中进入 org.apache.rocketmq.broker.BrokerController 387行
  • ...
    // 如果角色是从节点
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    ...
     // 在固定的时间执行定时任务
     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        ... 
        // 启动定时任务进行同步元数据信息
       BrokerController.this.slaveSynchronize.syncAll();   
    
  • 具体的同步逻辑看下 SlaveSynchronize 下的 syncAll()方法
  • public void syncAll() {
        // 同步 Topic 配置信息
        this.syncTopicConfig();
        // 同步消费者偏移量
        this.syncConsumerOffset();
        // 同步延迟偏移量 
        this.syncDelayOffset();
        // 同步订阅组配置信息
        this.syncSubscriptionGroupConfig();
    }
    
  • 查看 syncTopicConfig() 的同步逻辑
  • ...
    TopicConfigSerializeWrapper topicWrapper =
    this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
    ...
    // getAllTopicConfig(masterAddrBak) -> this.remotingClient.invokeSync(...)
    ... 
    // 进行同步调用 涉及到netty remoting模块的代码 NettyRemotingClient invokeSync(...)
    // 算出极限耗时代价
    long costTime = System.currentTimeMillis() - beginStartTime;
    // 如果同步等待的时长 < 极限耗时 说明请求不成立 直接抛出 RemotingTimeoutException
    if (timeoutMillis < costTime) {
        throw new RemotingTimeoutException("invokeSync call timeout");
    }
    // 同步请求 kernel
    RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
    ... 
    
    
  • invokeSyncImpl(...),真实的同步请求的实现
  • ...
    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
          final long timeoutMillis)
          throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
          // opaque 相当于request ID, 每发送一次请求request,创建一个RemotingCommand实例, 
          final int opaque = request.getOpaque();
    
          try {
              // 根据opaque(request ID)构建 ResponseFuture
              final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
              // 将 ResponseFuture 放入 responseTable <KEY:RequestID, VALUE:ResponseFuture>
              this.responseTable.put(opaque, responseFuture);
              final SocketAddress addr = channel.remoteAddress();
              // 使用 netty channel 来发送请求
              channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                  // 轮询监听
                  @Override
                  public void operationComplete(ChannelFuture f) throws Exception {
                      // OK setSendRequestOK 为 true
                      if (f.isSuccess()) {
                          responseFuture.setSendRequestOK(true);
                          return;
                      } else {
                          // 否则 setSendRequestOK 为 false
                          responseFuture.setSendRequestOK(false);
                      }
                      // 异常情况
                      // 设置 Map 快速清除
                      responseTable.remove(opaque);
                      // 设置异常
                      responseFuture.setCause(f.cause());
                      // 赋值为NULL
                      responseFuture.putResponse(null);
                      log.warn("send a request command to channel <" + addr + "> failed.");
                  }
              });
    
              RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
              if (null == responseCommand) {
                  if (responseFuture.isSendRequestOK()) {
                      throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                          responseFuture.getCause());
                  } else {
                      throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                  }
              }
    
              return responseCommand;
          } finally {
              this.responseTable.remove(opaque);
          }
      }
    
    ...
    
    
    
  • 总结: RocketMQ Topic同步 底层实际是使用 Netty 做的同步通信
  • TODO 其他的同步可以后续进行补充

  • 数据同步逻辑在 org.apache.rocketmq.store.ha 下的 HAConnection HAService WaitNotifyObject 中

  • HAConnection 主要是做数据的读写操作的

  • 主要逻辑

    • Master节点
      • AcceptSocketService: 接收Slave节点连接
      • HAConnection
        • ReadSocketService: 读来自Slave节点的数据
        • WriteSocketService: 写往Slave节点的数据
    • Slave节点:
      • HAService
        • HAClient对Master节点连接, 读写请求
  • 通信协议: Master节点与Slave节点 通信协议都很简单, 只有如下两条

    • Slave -> Master 上报CommitLog已经同步到的物理位置
    • Master -> Slave : 传输新的CommitLog数据
  • TODO 后续自己补充源码

3.3 RocketMQ同步消息发送和异步发送消息

  • 同步发送: producer.send(msg)
  • 同步发送消息核心实现: DefaultMQProducerImpl
  • 异步发送: producer.send(Message msg, SendCallback sendCallback)
  • 异步发送消息核心实现: DefaultMQProducerImpl

3.4 Netty底层框架解析

  • 核心类 RemotingService RemotingClient RemotingServer NettyRemotingClient NettyRemotingServer NettyRemotingAbstract
  • 协议 消息长度+序列化类型&&头部长度+消息头数据+消息主体数据

3.5 RocketMQ生产者-消息返回状态详解

  • SEND_OK
  • FLUSH_DISK_TIMEOUT 要做补偿和可靠性设计
  • FLUSH_SLAVE_TIMEOUT 要做补偿和可靠性设计
  • SLAVE_NOT_AVAILABLE 要做补偿和可靠性设计

3.7 RocketMQ生产者-延迟消息

  • 延迟消息: 消息发到Broker后, 要特定的时间才会被Consumer消费
  • 目前只支持固定精度的定时消息[因为对消息的顺序有一定的要求,当我们不固定精度,会加大Broker的性能开销]
  • MessageStoreConfig 配置类& ScheduleMessageService 任务类
  • setDelayTimeLevel 方法设置

3.8 RocketMQ生产者-自定义消息发送策略

  • 如何把消息发送到指定队列(Message Queue)
  • MessageQueueSelector
  • producer.send(Msg, selector, Obj)

4. RocketMQ 消费者核心研究

5. RocketMQ 核心原理研究

6. 双主双从部署

6.1 设计生产的架构

  • NameServer集群化部署, 保证高可用, 每台NameServer实际上都会有完整的集群路由信息包括所有的Broker节点信息我们的数据信息
  • 基于Dledger的Broker主从架构部署
    • RocketMQ 4.5以前的那种普通的Master-Slave架构来部署能在一定程度上保证数据不丢失也能保证一定的可用性
    • 但是那种方式的缺陷是很明显的最大的问题就是当Master Broker挂了之后没办法让Slave Broker自动切换为新的Master Broker需要手工做一些运维操作修改配置以及重启机器才行这个非常麻烦。
    • Dledger技术是要求至少得是一个Master带两个Slave这样有三个Broke组成一个Group也就是作为一个分组来运行。一旦 Master宕机他就可以从剩余的两个Slave中选举出来一个新的Master对外提供服务。
  • Broker是如何跟NameServer进行通信的
    • Broker会每隔30秒发送心跳到所有的NameServer 上去然后每个NameServer都会每隔10s检查一次有没有哪个Broker超过120s没发送心跳的如果有就认为那个Broker已经宕机 了从路由信息里要摘除这个Broker。
    • RocketMQ的实现中采用的是TCP长连接进行通信。
    • Broker会跟每个NameServer都建立一个TCP长连接然后定时通过TCP长连接发送心跳请求过去
  • 使用MQ的系统都要多机器集群部署
    • 无论作为生产者还是消费者的系统,都应 该多机器集群化部署,保证他自己本身作为生产者或者消费者的高可用性
  • MQ的核心数据模型:Topic到底是什么?
    • 就是一个数据集合的意思
    • 系统如果要往MQ里写入消息或者获取消息首先得创建一些Topic作为数据集合存放不同类型的消息
  • Topic作为一个数据集合是怎么在Broker集群里存储的
    • 我们可以在创建Topic的时候指定让他里面的数据分散存储在多台Broker机器上比如一个Topic里有1000万条数据此时有2台 Broker那么就可以让每台Broker上都放500万条数据
  • 生产者系统是如何将消息发送给Broker的?
    • 生产者一定是投递消息到Master Broker的然后Master Broker会同步数据给他的Slave Brokers实现 一份数据多份副本保证Master故障的时候数据不丢失而且可以自动把Slave切换为Master提供服务。
  • 消费者是如何从Broker上拉取消息的?
    • 消费者系统可能会从Master Broker拉取消息也可能从Slave Broker拉取消息都有可能一切都看具体 情况。

6.2 部署机器配置

  • NameServer:3台机器每台机器都是8核CPU + 16G内存 + 500G磁盘 + 千兆网卡

  • Broker:3台机器每台机器都是24核CPU(两颗x86_64 cpu每颗cpu是12核) + 48G内存 + 1TB磁盘 + 千兆网卡

  • 生产者:2台机器每台机器都是4核CPU + 8G内存 + 500GB磁盘 + 千兆网卡

  • 消费者:2台机器每台机器都是4核CPU + 8G内存 + 500GB磁盘 + 千兆网卡

  • NameServer是核心的路由服务所以给8核16G的较高配置的机器但是他一般就是承载Broker注册和心跳、系统的路由表拉取等请 求,负载其实很低,因此不需要特别高的机器配置,部署三台也可以实现高可用的效果了。

  • Broker是最负载最高的未来要承载高并发写入和海量数据存储所以把最高配置的机器都会留给他这里用3台机器组成一个“单 Master + 双Slave”的集群。

  • 生产者和消费者机器都是临时用来测试的而且一般他们都是业务系统只会部署在标准的4核8G的机器配置下。

6.3 参数调整

  • PS: 中间件系统在压测或者上生产之前,需要对三大块参数进行调整:OS内核参数、JVM参数以及中间件核心参数
  • PS: OS内核参数主要调整的地方都是跟磁盘IO、网络通信、内存管理以及线程管理有关的需要适当调节大小
  • PS: JVM参数需要我们去中间件系统的启动脚本中寻找他的默认JVM参数然后根据机器的情况对JVM的堆内存大小新生代大 小Direct Buffer大小等等做出一些调整发挥机器的资源
  • PS: 中间件核心参数主要也是关注其中跟网络通信、磁盘IO、线程数量、内存 管理相关的,根据机器资源,适当可以增加网络通信线程,控制同步刷磁盘或者异步刷磁盘,线程数量有多少,内存中一些队列的大小

A. 对RocketMQ集群进行OS内核参数的调整

(1) vm.overcommit_memory

  • “vm.overcommit_memory”这个参数有三个值可以选择0、1、2。
    • 0: 如果值是0的话在你的中间件系统申请内存的时候os内核会检查可用内存是否足够如果足够的话就分配内存给你如果感觉剩余 内存不是太够了,干脆就拒绝你的申请,导致你申请内存失败,进而导致中间件系统异常出错。
    • 1: 一般需要将这个参数的值调整为1意思是把所有可用的物理内存都允许分配给你只要有内存就给你来用这样可以避免申请内存失败的问题。

可以用如下命令修改: echo 'vm.overcommit_memory=1' >> /etc/sysctl.conf

(2) vm.max_map_count

  • 这个参数的值会影响中间件系统可以开启的线程的数量,同样也是非常重要的
    • 如果这个参数过小,有的时候可能会导致有些中间件无法开启足够的线程,进而导致报错,甚至中间件系统挂掉。
    • 他的默认值是65536但是这个值有时候是不够的比如大数据团队的生产环境部署的Kafka集群曾经有一次就报出过这个异常说无法开启足够多的线程直接导致Kafka宕机了。
    • 因此建议可以把这个参数调大10倍比如655360这样的值保证中间件可以开启足够多的线程。

可以用如下命令修改:echo 'vm.max_map_count=655360' >> /etc/sysctl.conf

(3) vm.swappiness

  • 这个参数是用来控制进程的swap行为的这个简单来说就是os会把一部分磁盘空间作为swap区域然后如果有的进程现在可能不是太 活跃, 就会被操作系统把进程调整为睡眠状态把进程中的数据放入磁盘上的swap区域然后让这个进程把原来占用的内存空间腾出 来,交给其他活跃运行的进程来使用。
    • 如果这个参数的值设置为0意思就是尽量别把任何一个进程放到磁盘swap区域去尽量大家都用物理内存。
    • 如果这个参数的值是100那么意思就是尽量把一些进程给放到磁盘swap区域去内存腾出来给活跃的进程使用。
    • 默认这个参数的值是60有点偏高了可能会导致我们的中间件运行不活跃的时候被迫腾出内存空间然后放磁盘swap区域去。
    • 因此通常在生产环境建议把这个参数调整小一些比如设置为10尽量用物理内存别放磁盘swap区域去。

可以用如下命令修改:echo 'vm.swappiness=10' >> /etc/sysctl.conf

(4) ulimit

  • 这个是用来控制linux上的最大文件链接数的默认值可能是1024一般肯定是不够的因为你在大量频繁的读写磁盘文件的时候或 者是进行网络通信的时候,都会跟这个参数有关系
    • 对于一个中间件系统而言肯定是不能使用默认值的,如果你采用默认值,很可能在线上会出现如下错误:error: too many open files

因此通常建议用如下命令修改这个值:echo 'ulimit -n 1000000' >> /etc/profile

(5) 总结:

  • 要调整的东西无非都是跟磁盘文件IO、网络通信、内存管理、线程数量有关系的 因为我们的中间件系统在运行的时候无非就是跟这些打交道
    • 中间件系统肯定要开启大量的线程(跟vm.max_map_count有关)
    • 而且要进行大量的网络通信和磁盘IO(跟ulimit有关)
    • 然后大量的使用内存(跟vm.swappiness和vm.overcommit_memory有关)
  • 所以对OS内核参数的调整往往也就是围绕跟中间件系统运行最相关的一些东西

B. 对JVM参数进行调整

  • 在rocketmq/distribution/target/apache-rocketmq/bin目录下就有对应的启动脚本比如:
    • mqbroker是用来启动Broker的
    • mqnamesvr是用来启动NameServer的
  • 用mqbroker来举例我们查看这个脚本里的内容最后有如下一行:
    • sh {ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup @
    • 这一行内容就是用runbroker.sh脚本来启动一个JVM进程JVM进程刚开始执行的main类就是 org.apache.rocketmq.broker.BrokerStartup
  • runbroker.sh脚本在里面可以看到如下内容:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 - XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps - XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n" JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
  • 在上面的内容中其实就是在为启动Broker设置对应的JVM参数和其他一些参数
  • PS: 狸猫技术窝的《从0开始带你成为JVM实战高手》这个专栏, 可以去学习MQ相关的内容

  • [了解] -server:这个参数就是说用服务器模式启动,这个没什么可说的,现在一般都是如此
  • [重点调整] -Xms8g -Xmx8g -Xmn4g:这个就是很关键的一块参数了也是重点需要调整的就是默认的堆大小是8g内存新生代是4g内存 如果我们的高配物理机是48g内存的
    • 所以这里完全可以给他们翻几倍比如给堆内存20g其中新生代给10g甚至可以更多一些当然要留一些内存给操作系统来用,但是要注意最小不能低于1g
  • [重点调整] -XX:+UseG1GC -XX:G1HeapRegionSize=16m:这几个参数也是至关重要的这是选用了G1垃圾回收器来做分代回收对新生代 和老年代都是用G1来回收
    • 这里把G1的region大小设置为了16m这个因为机器内存比较多所以region大小可以调大一些给到16m不然用2m的region会导致region数量过多的
  • [了解] -XX:G1ReservePercent=25:这个参数是说在G1管理的老年代里预留25%的空闲内存,保证新生代对象晋升到老年代的时候有足 够空间,避免老年代内存都满了,新生代有对象要进入老年代没有充足内存了
    • 默认值是10%略微偏少这里RocketMQ给调大了一些
  • [了解] -XX:InitiatingHeapOccupancyPercent=30:这个参数是说当堆内存的使用率达到30%之后就会自动启动G1的并发垃圾回收开始尝试回收一些垃圾对象
    • 默认值是45%这里调低了一些也就是提高了GC的频率但是避免了垃圾对象过多一次垃圾回收耗时过长的问题
  • [重点调整] -XX:SoftRefLRUPolicyMSPerMB=0:这个参数默认设置为0了
    • 在JVM优化专栏中救火队队长讲过这个参数引发的案例其实建议这个参数不要设置为0避免频繁回收一些软引用的Class对象这里可以调整为比如1000
  • [了解] -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps - XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy -XX:+UseGCLogFileRotation - XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m:这一堆参数都是控制GC日志打印输出的确定了gc日志文件的地址要 打印哪些详细信息然后控制每个gc日志文件的大小是30m最多保留5个gc日志文件。
  • [了解] -XX:-OmitStackTraceInFastThrow:这个参数是说有时候JVM会抛弃一些异常堆栈信息因此这个参数设置之后就是禁用这个特性要把完整的异常堆栈信息打印出来
  • [了解] -XX:+AlwaysPreTouch:这个参数的意思是我们刚开始指定JVM用多少内存不会真正分配给他会在实际需要使用的时候再分配给 他
    • 所以使用这个参数之后就是强制让JVM启动的时候直接分配我们指定的内存不要等到使用内存的时候再分配
  • [可以调整] -XX:MaxDirectMemorySize=15g:这是说RocketMQ里大量用了NIO中的direct buffer这里限定了direct buffer最多申请多少 如果你机器内存比较大可以适当调大这个值如果有朋友不了解direct buffer是什么可以自己查阅一些资料。
  • [了解] -XX:-UseLargePages -XX:-UseBiasedLocking:这两个参数的意思是禁用大内存页和偏向锁,这两个参数对应的概念每个要说清楚 都得一篇文章,所以这里大家直接知道人家禁用了两个特性即可。

  • 总结:
    • RocketMQ默认的JVM参数是采用了G1垃圾回收器默认堆内存大小是8G
    • 这个其实完全可以根据大家的机器内存来调整你可以增大一些也是没有问题的然后就是一些G1的垃圾回收的行为参数做了调整 这个一般我们不用去动然后就是对GC日志打印做了设置这个一般也不用动。
    • 其余的就是禁用一些特性开启一些特性这些都直接维持RocketMQ的默认值即可

C. 对RocketMQ核心参数进行调整

  • 在下面的目录里有dledger的示例配置文件:rocketmq/distribution/target/apache- rocketmq/conf/dledger
  • 在这里主要是有一个较为核心的参数:sendMessageThreadPoolNums=16
    • 这个参数的意思就是RocketMQ内部用来发送消息的线程池的线程数量默认是16
    • 其实这个参数可以根据你的机器的CPU核数进行适当增加比如机器CPU是24核的可以增加这个线程数量到24或者30都是可以 的

6.4 压测

  • 我们在压测的时候一方面要关注RocketMQ能抗下多少TPS一方面还要关注RocketMQ部署的几台机器的资源使用率和负载情况
  • 我们平时做压测,主要关注的还是要压测出来一个最合适的最高负载。
    • 意思就是在RocketMQ的TPS和机器的资源使用率和负载之间取得一个平衡。
  • 以下压测过程以及压测结果都是根据我们之前真实的RocketMQ压测报告总结而来非常的有代表性大家完全可以结合我们之前说的机器配置来参考一下

(1) RocketMQ的TPS和消息延时

  • 我们让两个Producer不停的往RocketMQ集群发送消息每个Producer所在机器启动了80个线程相当于每台机器有80个线程并发的往RocketMQ集群写入消息。
  • RocketMQ集群是1主2从组成的一个dledger模式的高可用集群只有一个Master Broker会接收消息的写入。有2个Cosumer不停的从RocketMQ集群消费数据。[只主节点会接到数据的成功写入]
  • 每条数据的大小是500个字节,这个非常关键,大家一定要牢记这个数字,因为这个数字是跟后续的网卡流量有关的。
  • 一条消息从Producer生产出来到经过RocketMQ的Broker存储下来再到被Consumer消费基本上这个时间跨度不会超过1秒钟,这些这个性能是正常而且可以接受的。
  • 同时在RocketMQ的管理工作台中可以看到Master Broker的TPS(也就是每秒处理消息的数量)可以稳定的达到7万左右也就是 每秒可以稳定处理7万消息。

(2) cpu负载情况

  • 检查一下Broker机器上的CPU负载可以通过top、uptime等命令来查看
  • 执行top命令就可以看到cpu load和cpu使用率这就代表了cpu的负载情况
    • 在你执行了top命令之后往往可以看到如下一行信息:
    • load average:12.0312.0512.08
    • 类似上面那行信息代表的是cpu在1分钟、5分钟和15分钟内的cpu负载情况
    • 比如我们一台机器是24核的那么上面的12意思就是有12个核在使用中。换言之就是还有12个核其实还没使用cpu还是有很大余力的。
  • 这个cpu负载其实是比较好的因为并没有让cpu负载达到极限。

(3) 内存使用率

  • 使用free命令就可以查看到内存的使用率
  • 根据当时的测试结果机器上48G的内存仅仅使用了一部分还剩下很大一部分内存都是 空闲可用的或者是被RocketMQ用来进行磁盘数据缓存了
  • 所以内存负载是很低的

(4) JVM GC频率

  • 使用jstat命令就可以查看RocketMQ的JVM的GC频率基本上新生代每隔几十秒会垃圾回收一次每次回收过后存活的对象很少几乎不进入老年代

(5) 磁盘IO负载

  • 首先可以用top命令查看一下IO等待占用CPU时间的百分比你执行top命令之后会看到一行类似下面的东西:
  • Cpu(s): 0.3% us, 0.3% sy, 0.0% ni, 76.7% id, 13.2% wa, 0.0% hi, 0.0% si
  • 在这里的13.2% wa说的就是磁盘IO等待在CPU执行时间中的百分比
  • 如果这个比例太高说明CPU执行的时候大部分时间都在等待执行IO也就说明IO负载很高导致大量的IO等待
  • 这个当时我们压测的时候是在40%左右说明IO等待时间占用CPU执行时间的比例在40%左右,这是相对高一些,但还是可以接受的,只不过如果继续让这个比例提高上去, 就很不靠谱了因为说明磁盘IO负载可能过高了。

(6) 网卡流量

  • 使用如下命令可以查看服务器的网卡流量:
  • sar -n DEV 1 2
  • 通过这个命令就可以看到每秒钟网卡读写数据量了。当时我们的服务器使用的是千兆网卡千兆网卡的理论上限是每秒传输128M数 据但是一般实际最大值是每秒传输100M数据。
  • 因此当时我们发现的一个问题就是在RocketMQ处理到每秒7万消息的时候每条消息500字节左右的大小的情况下每秒网卡传输 数据量已经达到100M了就是已经达到了网卡的一个极限值了。
  • 因为一个Master Broker服务器每秒不光是通过网络接收你写入的数据还要把数据同步给两个Slave Broker还有别的一些网络通 信开销。
  • 因此实际压测发现每条消息500字节每秒7万消息的时候服务器的网卡就几乎打满了无法承载更多的消息了。

(7) 针对压测的一点小总结

  • 通过本次压测做一些总结:
    • 当我们使用平均大小为500字节的消息时最多就是做到RocketMQ单台服务器每秒7万的TPS而且这个时候cpu负载、内存负 载、jvm gc负载、磁盘io负载基本都还在正常范围内
    • 只不过这个时候网卡流量基本已经打满了无法再提升TPS了
    • 因此在这样的一个机器配置下RocketMQ一个比较靠谱的TPS就是7万左右

总结

  • 到底应该如何压测:
    • 应该在TPS和机器的cpu负载、内存使用率、jvm gc频率、磁盘io负载、网络流量负载之间取得一个平衡尽量让 TPS尽可能的提高同时让机器的各项资源负载不要太高。
  • 实际压测过程:
    • 采用几台机器开启大量线程并发读写消息然后观察TPS、cpu load(使用top命令)、内存使用率(使用free命 令)、jvm gc频率(使用jstat命令)、磁盘io负载(使用top命令)、网卡流量负载(使用sar命令)不断增加机器和线程让TPS不 断提升上去,同时观察各项资源负载是否过高。
  • 生产集群规划:
    • 根据公司的后台整体QPS来定稍微多冗余部署一些机器即可实际部署生产环境的集群时使用高配置物理机同时 合理调整os内核参数、jvm参数、中间件核心参数如此即可

7. 购物车、订单与支付场景抗压实战 -> 可以移步到

8. RocketMQ分布式事务消息

8.1 生产案例:从 RocketMQ 全链路分析一下为什么用户支付后没有收到红包?

  • 用户支付之后红包到底为什么没发送出去呢?
    • 其实原因有多种可能比如订单系统推送消息到MQ就失败了压根儿就没推送过去;
    • 或者是消息确实推送到MQ了但是结果MQ自己机器故障把消息搞丢了;
    • 或者是红包系统拿到了消息,但是他把消息搞丢了,结果红包还没来得及发。
    • 如果真的在生产环境里要搞明白这个问题,就必须要打更多的日志去一点点分析消息到底是在哪个环节丢失了?
    • 如果订单系统推送了消息结果红包系统连消息都没收到那可能消息根本就没发到MQ去或者MQ自己搞丢了消息。
    • 如果红包系统收到了消息,结果红包没派发,那么就是红包系统搞丢了消息。

9. RocketMQ顺序消费与微服务解耦

10. 数据过滤与性能提升