You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

301 lines
11 KiB
Markdown

# RocketMQ
## 1. RocketMQ 基础
- 官方QuickStart: https://github.com/apache/rocketmq/tree/master/docs/cn
- 官方文档: https://rocketmq.apache.org/docs/
- 最新版本 4.9.4 【截止2022-7-20】
- 学习版本 4.3.0 【截止2022-7-20】
### 1.1 整体介绍
- RocketMQ是一款分布式、队列模型的消息中间件
- 特性:
- 支持分布式事物
- 支持集群模型、负载均衡、水平扩展能力
- 亿级别的消息堆积能力
- 采用零拷贝的原理、顺序写盘、随机读
- 丰富的API封装
- 代码优秀底层通信框架采用Netty NIO框架
- NameServer代替Zookeeper
- 集群无单点,可扩展,任意一点高可用,水平可扩展
- 消息失败重试机制,消息可查询
- 开源社区活跃、成熟度(经过双十一考验)
### 1.2 概念模型
- Producer: 消息生产者,负责生产消息,一般由业务系统负责产生消息
---
- Consumer: 消息消费者,负责消费消息,一般是后台系统负责异步消费
- Push Consumer: Consumer的一种, 需要向Consumer对象注册监听
- Pull Consumer: Consumer的一种, 需要主动请求Broker拉取消息
---
- Producer Group: 生产者集合, 一般用于发送一类消息
- Consumer Group: 消费者集合, 一般用于接收一类消息进行消费
---
- Broker: MQ消息服务(中转角色, 用于消息存储与生产消费转发)
- Message: TODO
### 1.3 源码工程讲解
- 编译完成的文件在 rocket-distribution/target/ 下
- RocketMQ的源码结构
- rocketmq-broker: 主要的业务逻辑, 消息收发, 主从同步, pagecache
- rocketmq-client: 客户端接口, 比如生产者和消费者
- rocketmq-example: 示例, 比如生产者和消费者
- rocketmq-common: 公用数据结构等等
- rocketmq-distribution: 编译模块, 编译输出等
- rocketmq-filter: 进行Broker过滤的不感兴趣的消息传输,减小带宽压力
- rocketmq-logappender, rocketmq-logging 日志相关
- rocketmq-namesrv Namesrv服务, 用于服务协调
- rocketmq-openmessaging 对外提供服务
- rocketmq-remoting 远程调用接口, 封装Netty底层通信
- rocketmq-util 提供一些公用的工具方法, 比如解析命令行参数
- rocketmq-store 消息存储
- rocketmq-test
- rocketmq-tools 管理工具, 比如有名的mqadmin工具
### 1.4 环境搭建
- 参考 RocketMQ使用手册.pdf
- ps: 内存大小至少也要1GB
- ps: 先启动NameSrv 再启动Broker
- ps: 测试使用1个配置文件即可
### 1.5 RocketMQ 控制台使用
- RocketMQ 扩展组件: https://github.com/apache/rocketmq-externals
- 可以集成控制台,修改控制台即可
- 集成SpringBoot也在这里
## 2. RocketMQ 基本入门
### 2.1 生产者的使用及使用控制台查消息
1. 创建生产者对象 DefaultMQProducer (必须要对生产者组的名称进行配置)
2. 设置 NamesrvAddr
3. 设置一些需要配置的参数
4. 启动生产者服务
5. 创建消息并发送
### 2.2 消费者的使用及Broker重试机制
1. 创建消费者对象 DefaultMQPushConsumer
2. 设置NamesrvAddr及其消费位置ConsumerFromWhere
3. 进行订阅主题 subscribe
4. 注册监听并消费 registerMessageListener
2 years ago
### 2.3 RocketMQ - 集群环境构建详解
- 单点模式
- 主从模式
- 双主模式
- 双主双从,多主多从模式(推荐)
- 主从模式环境构建可以保障消息的即时性与可靠性
- 投递一条消息后, 关闭主节点, 从节点继续可以提供消费者数据进行消费, 但是不能接收消息, 主节点上线后进行消费进行offset同步
- 集群的关闭和启动
参考手册-数据清理: RocketMQ使用手册.pdf
- 集群配置
参考手册-集群配置: RocketMQ使用手册.pdf
## 3. RocketMQ 生产者核心研究
### 3.1 RocketMQ生产者-核心参数讲解
- producerGroup: 组名
- createTopicKey: 创建Topic, 正式开发不能由开发组进行指定的, 对createTopicKey进行封装,不对外进行使用
- defaultTopicQueueNums (默认为4)
- sendMsgTimeout(单位: ms)
- compressMsgBodyOverHowmuch(默认压缩字节4096,默认有消息的压缩,可以提升发送性能)
- retryTimesWhenSendFailed(有同步的重发策略和同名的异步重发策略)
- retryAnotherBrokerWhenNotStoreOK(默认为false, 没存储成功使用其他的Broker进行存储)
- maxMessageSize(默认是128k)
- ...
### 3.2 主从同步机制分析
- 同步大致概念
- 同步信息: 数据内容 + 元数据信息
- 元数据同步: Broker角色识别, 为Slave则启动同步任务
- 消息同步: HAService, HAConnection, WaitNotifyObject
- 同步源码设计
- 首先明确同步元数据肯定是在Broker中,且肯定是执行定时任务。 从Broker中进入 org.apache.rocketmq.broker.BrokerController 387行
- ```java
...
// 如果角色是从节点
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
...
// 在固定的时间执行定时任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
...
// 启动定时任务进行同步元数据信息
BrokerController.this.slaveSynchronize.syncAll();
```
- 具体的同步逻辑看下 SlaveSynchronize 下的 syncAll()方法
- ```java
public void syncAll() {
// 同步 Topic 配置信息
this.syncTopicConfig();
// 同步消费者偏移量
this.syncConsumerOffset();
// 同步延迟偏移量
this.syncDelayOffset();
// 同步订阅组配置信息
this.syncSubscriptionGroupConfig();
}
```
- 查看 syncTopicConfig() 的同步逻辑
- ```java
...
TopicConfigSerializeWrapper topicWrapper =
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
...
// getAllTopicConfig(masterAddrBak) -> this.remotingClient.invokeSync(...)
...
// 进行同步调用 涉及到netty remoting模块的代码 NettyRemotingClient invokeSync(...)
// 算出极限耗时代价
long costTime = System.currentTimeMillis() - beginStartTime;
// 如果同步等待的时长 < 极限耗时 说明请求不成立 直接抛出 RemotingTimeoutException
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
// 同步请求 kernel
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
...
```
- invokeSyncImpl(...),真实的同步请求的实现
- ```java
...
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
// opaque 相当于request ID, 每发送一次请求request,创建一个RemotingCommand实例,
final int opaque = request.getOpaque();
try {
// 根据opaque(request ID)构建 ResponseFuture
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
// 将 ResponseFuture 放入 responseTable <KEY:RequestID, VALUE:ResponseFuture>
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
// 使用 netty channel 来发送请求
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
// 轮询监听
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// OK setSendRequestOK 为 true
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
// 否则 setSendRequestOK 为 false
responseFuture.setSendRequestOK(false);
}
// 异常情况
// 设置 Map 快速清除
responseTable.remove(opaque);
// 设置异常
responseFuture.setCause(f.cause());
// 赋值为NULL
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
...
```
- 总结: RocketMQ Topic同步 底层实际是使用 Netty 做的同步通信
- TODO 其他的同步可以后续进行补充
2 years ago
---
- 数据同步逻辑在 org.apache.rocketmq.store.ha 下的 HAConnection HAService WaitNotifyObject 中
- HAConnection 主要是做数据的读写操作的
- 主要逻辑
- Master节点
- AcceptSocketService: 接收Slave节点连接
- HAConnection
- ReadSocketService: 读来自Slave节点的数据
- WriteSocketService: 写往Slave节点的数据
- Slave节点:
- HAService
- HAClient对Master节点连接, 读写请求
- 通信协议: Master节点与Slave节点 通信协议都很简单, 只有如下两条
- Slave -> Master 上报CommitLog已经同步到的物理位置
- Master -> Slave : 传输新的CommitLog数据
- TODO 后续自己补充源码
### 3.3 RocketMQ同步消息发送和异步发送消息
- 同步发送: producer.send(msg)
- 同步发送消息核心实现: DefaultMQProducerImpl
- 异步发送: producer.send(Message msg, SendCallback sendCallback)
- 异步发送消息核心实现: DefaultMQProducerImpl
### 3.4 Netty底层框架解析
- 核心类 RemotingService RemotingClient RemotingServer NettyRemotingClient NettyRemotingServer NettyRemotingAbstract
- 协议 消息长度+序列化类型&&头部长度+消息头数据+消息主体数据
### 3.5 RocketMQ生产者-消息返回状态详解
- SEND_OK
- FLUSH_DISK_TIMEOUT 要做补偿和可靠性设计
- FLUSH_SLAVE_TIMEOUT 要做补偿和可靠性设计
- SLAVE_NOT_AVAILABLE 要做补偿和可靠性设计
### 3.7 RocketMQ生产者-延迟消息
- 延迟消息: 消息发到Broker后, 要特定的时间才会被Consumer消费
- 目前只支持固定精度的定时消息[因为对消息的顺序有一定的要求,当我们不固定精度,会加大Broker的性能开销]
- MessageStoreConfig 配置类& ScheduleMessageService 任务类
- setDelayTimeLevel 方法设置
2 years ago
### 3.8 RocketMQ生产者-自定义消息发送策略
- 如何把消息发送到指定队列(Message Queue)
- MessageQueueSelector
- producer.send(Msg, selector, Obj)
2 years ago
## 4. RocketMQ 消费者核心研究
2 years ago
## 5. RocketMQ 核心原理研究
2 years ago
## 6. 双主双从部署
2 years ago
## 7. 购物车、订单与支付场景抗压实战 -> 可以移步到
2 years ago
## 8. RocketMQ分布式事务消息
2 years ago
## 9. RocketMQ顺序消费与微服务解耦
2 years ago
## 10. 数据过滤与性能提升