You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
11 KiB
11 KiB
RocketMQ
1. RocketMQ 基础
- 官方QuickStart: https://github.com/apache/rocketmq/tree/master/docs/cn
- 官方文档: https://rocketmq.apache.org/docs/
- 最新版本 4.9.4 【截止2022-7-20】
- 学习版本 4.3.0 【截止2022-7-20】
1.1 整体介绍
- RocketMQ是一款分布式、队列模型的消息中间件
- 特性:
- 支持分布式事物
- 支持集群模型、负载均衡、水平扩展能力
- 亿级别的消息堆积能力
- 采用零拷贝的原理、顺序写盘、随机读
- 丰富的API封装
- 代码优秀,底层通信框架采用Netty NIO框架
- NameServer代替Zookeeper
- 集群无单点,可扩展,任意一点高可用,水平可扩展
- 消息失败重试机制,消息可查询
- 开源社区活跃、成熟度(经过双十一考验)
1.2 概念模型
- Producer: 消息生产者,负责生产消息,一般由业务系统负责产生消息
- Consumer: 消息消费者,负责消费消息,一般是后台系统负责异步消费
- Push Consumer: Consumer的一种, 需要向Consumer对象注册监听
- Pull Consumer: Consumer的一种, 需要主动请求Broker拉取消息
- Producer Group: 生产者集合, 一般用于发送一类消息
- Consumer Group: 消费者集合, 一般用于接收一类消息进行消费
- Broker: MQ消息服务(中转角色, 用于消息存储与生产消费转发)
- Message: TODO
1.3 源码工程讲解
- 编译完成的文件在 rocket-distribution/target/ 下
- RocketMQ的源码结构
- rocketmq-broker: 主要的业务逻辑, 消息收发, 主从同步, pagecache
- rocketmq-client: 客户端接口, 比如生产者和消费者
- rocketmq-example: 示例, 比如生产者和消费者
- rocketmq-common: 公用数据结构等等
- rocketmq-distribution: 编译模块, 编译输出等
- rocketmq-filter: 进行Broker过滤的不感兴趣的消息传输,减小带宽压力
- rocketmq-logappender, rocketmq-logging 日志相关
- rocketmq-namesrv Namesrv服务, 用于服务协调
- rocketmq-openmessaging 对外提供服务
- rocketmq-remoting 远程调用接口, 封装Netty底层通信
- rocketmq-util 提供一些公用的工具方法, 比如解析命令行参数
- rocketmq-store 消息存储
- rocketmq-test
- rocketmq-tools 管理工具, 比如有名的mqadmin工具
1.4 环境搭建
- 参考 RocketMQ使用手册.pdf
- ps: 内存大小至少也要1GB
- ps: 先启动NameSrv 再启动Broker
- ps: 测试使用1个配置文件即可
1.5 RocketMQ 控制台使用
- RocketMQ 扩展组件: https://github.com/apache/rocketmq-externals
- 可以集成控制台,修改控制台即可
- 集成SpringBoot也在这里
2. RocketMQ 基本入门
2.1 生产者的使用及使用控制台查消息
- 创建生产者对象 DefaultMQProducer (必须要对生产者组的名称进行配置)
- 设置 NamesrvAddr
- 设置一些需要配置的参数
- 启动生产者服务
- 创建消息并发送
2.2 消费者的使用及Broker重试机制
- 创建消费者对象 DefaultMQPushConsumer
- 设置NamesrvAddr及其消费位置ConsumerFromWhere
- 进行订阅主题 subscribe
- 注册监听并消费 registerMessageListener
2.3 RocketMQ - 集群环境构建详解
-
单点模式
-
主从模式
-
双主模式
-
双主双从,多主多从模式(推荐)
- 主从模式环境构建可以保障消息的即时性与可靠性
- 投递一条消息后, 关闭主节点, 从节点继续可以提供消费者数据进行消费, 但是不能接收消息, 主节点上线后进行消费进行offset同步
-
集群的关闭和启动
参考手册-数据清理: RocketMQ使用手册.pdf
-
集群配置 参考手册-集群配置: RocketMQ使用手册.pdf
3. RocketMQ 生产者核心研究
3.1 RocketMQ生产者-核心参数讲解
- producerGroup: 组名
- createTopicKey: 创建Topic, 正式开发不能由开发组进行指定的, 对createTopicKey进行封装,不对外进行使用
- defaultTopicQueueNums (默认为4)
- sendMsgTimeout(单位: ms)
- compressMsgBodyOverHowmuch(默认压缩字节4096,默认有消息的压缩,可以提升发送性能)
- retryTimesWhenSendFailed(有同步的重发策略和同名的异步重发策略)
- retryAnotherBrokerWhenNotStoreOK(默认为false, 没存储成功使用其他的Broker进行存储)
- maxMessageSize(默认是128k)
- ...
3.2 主从同步机制分析
- 同步大致概念
- 同步信息: 数据内容 + 元数据信息
- 元数据同步: Broker角色识别, 为Slave则启动同步任务
- 消息同步: HAService, HAConnection, WaitNotifyObject
- 同步源码设计
- 首先明确同步元数据肯定是在Broker中,且肯定是执行定时任务。 从Broker中进入 org.apache.rocketmq.broker.BrokerController 387行
-
... // 如果角色是从节点 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { ... // 在固定的时间执行定时任务 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { ... // 启动定时任务进行同步元数据信息 BrokerController.this.slaveSynchronize.syncAll();
- 具体的同步逻辑看下 SlaveSynchronize 下的 syncAll()方法
-
public void syncAll() { // 同步 Topic 配置信息 this.syncTopicConfig(); // 同步消费者偏移量 this.syncConsumerOffset(); // 同步延迟偏移量 this.syncDelayOffset(); // 同步订阅组配置信息 this.syncSubscriptionGroupConfig(); }
- 查看 syncTopicConfig() 的同步逻辑
-
... TopicConfigSerializeWrapper topicWrapper = this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); ... // getAllTopicConfig(masterAddrBak) -> this.remotingClient.invokeSync(...) ... // 进行同步调用 涉及到netty remoting模块的代码 NettyRemotingClient invokeSync(...) // 算出极限耗时代价 long costTime = System.currentTimeMillis() - beginStartTime; // 如果同步等待的时长 < 极限耗时 说明请求不成立 直接抛出 RemotingTimeoutException if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } // 同步请求 kernel RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); ...
- invokeSyncImpl(...),真实的同步请求的实现
-
... public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { // opaque 相当于request ID, 每发送一次请求request,创建一个RemotingCommand实例, final int opaque = request.getOpaque(); try { // 根据opaque(request ID)构建 ResponseFuture final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); // 将 ResponseFuture 放入 responseTable <KEY:RequestID, VALUE:ResponseFuture> this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); // 使用 netty channel 来发送请求 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { // 轮询监听 @Override public void operationComplete(ChannelFuture f) throws Exception { // OK setSendRequestOK 为 true if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { // 否则 setSendRequestOK 为 false responseFuture.setSendRequestOK(false); } // 异常情况 // 设置 Map 快速清除 responseTable.remove(opaque); // 设置异常 responseFuture.setCause(f.cause()); // 赋值为NULL responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } } ...
- 总结: RocketMQ Topic同步 底层实际是使用 Netty 做的同步通信
- TODO 其他的同步可以后续进行补充
-
数据同步逻辑在 org.apache.rocketmq.store.ha 下的 HAConnection HAService WaitNotifyObject 中
-
HAConnection 主要是做数据的读写操作的
-
主要逻辑
- Master节点:
- AcceptSocketService: 接收Slave节点连接
- HAConnection
- ReadSocketService: 读来自Slave节点的数据
- WriteSocketService: 写往Slave节点的数据
- Slave节点:
- HAService
- HAClient:对Master节点连接, 读写请求
- HAService
- Master节点:
-
通信协议: Master节点与Slave节点 通信协议都很简单, 只有如下两条
- Slave -> Master : 上报CommitLog已经同步到的物理位置
- Master -> Slave : 传输新的CommitLog数据
-
TODO 后续自己补充源码
3.3 RocketMQ同步消息发送和异步发送消息
- 同步发送: producer.send(msg)
- 同步发送消息核心实现: DefaultMQProducerImpl
- 异步发送: producer.send(Message msg, SendCallback sendCallback)
- 异步发送消息核心实现: DefaultMQProducerImpl
3.4 Netty底层框架解析
- 核心类 RemotingService RemotingClient RemotingServer NettyRemotingClient NettyRemotingServer NettyRemotingAbstract
- 协议 消息长度+序列化类型&&头部长度+消息头数据+消息主体数据
3.5 RocketMQ生产者-消息返回状态详解
- SEND_OK
- FLUSH_DISK_TIMEOUT 要做补偿和可靠性设计
- FLUSH_SLAVE_TIMEOUT 要做补偿和可靠性设计
- SLAVE_NOT_AVAILABLE 要做补偿和可靠性设计
3.7 RocketMQ生产者-延迟消息
- 延迟消息: 消息发到Broker后, 要特定的时间才会被Consumer消费
- 目前只支持固定精度的定时消息[因为对消息的顺序有一定的要求,当我们不固定精度,会加大Broker的性能开销]
- MessageStoreConfig 配置类& ScheduleMessageService 任务类
- setDelayTimeLevel 方法设置
3.8 RocketMQ生产者-自定义消息发送策略
- 如何把消息发送到指定队列(Message Queue)
- MessageQueueSelector
- producer.send(Msg, selector, Obj)