From 08b87590cd9d03c805da746fd75c77205ac55e4a Mon Sep 17 00:00:00 2001 From: qyx <565485304@qq.com> Date: Fri, 6 Sep 2024 16:52:40 +0800 Subject: [PATCH] =?UTF-8?q?feat(master):RMQ=20=E6=80=BB=E7=BB=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RMQ 的相关代码 --- .../SpringCloud-Stream.md | 29 +++-- .../controller/RocketMQController.java | 51 +++++++++ .../rocket/RocketMQConsumerMessageExt.java | 29 +++++ .../rocket/RocketMQConsumerObject.java | 27 +++++ .../rocket/RocketMQConsumerString.java | 28 +++++ .../rocket/RocketMQConsumerTagString.java | 28 +++++ .../org/example/rocket/RocketMQProducer.java | 100 ++++++++++++++++++ .../resources/http/rocket-mq-controller.http | 8 +- .../src/main/resources/bootstrap.yml | 12 +-- 9 files changed, 296 insertions(+), 16 deletions(-) create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/RocketMQController.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerMessageExt.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerObject.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerString.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerTagString.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQProducer.java diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/SpringCloud-Stream.md b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/SpringCloud-Stream.md index f15be38..703e3f7 100644 --- a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/SpringCloud-Stream.md +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/SpringCloud-Stream.md @@ -3,21 +3,38 @@ ### SpringBoot 集成 Kafka 构建消息驱动微服务 -- +- 下载安装 kafka +- 下载 Kafka :https://kafka.apache.org/quickstart +- 解压、启动 ZK 和 Kafka Server 即可(使用默认配置) +--- +- 基本架构 + - Producer -> Message[Topic] -> Kafka Broker -> Partition <- Consumer[Topic] ### SpringBoot 集成 RocketMQ 构建消息驱动微服务 -- +- 下载、安装 RocketMQ + - 下载 RocketMQ: http://rocketmg.apache.org/docs/quick-start + - 下载以 bin-release 结尾的 zip 包解压即完成安装 +--- +- MQ 的启动, 关注2个 + - mqnamesrv + - sh mqnamesrv + - mqbroker + - sh mqbroker -n localhost:9876 ### SpringCloud Stream 消息驱动组件概览 - +- 为什么会出现 SpringCloud Stream + - 如果没有 SpringCloud Stream,我们会怎么应用消息驱动? + - Producer -> Message -> Kafka/RocketMQ <- Consumer + - 有了Stream + - Producer -> Message -> [Kafka/RocketMQ][Stream] <- Consumer ### 基于 SpringCloud Stream 消息驱动的简单应用 - +- [dev-protocol-springcloud-stream](..%2Fdev-protocol-springcloud-stream) ### 自定义 Stream 消息通信信道实现定制分发 - +- [dev-protocol-springcloud-stream](..%2Fdev-protocol-springcloud-stream) ### SpringCloud Stream 消息分组和消费分区的配置与说明 - +- [dev-protocol-springcloud-stream](..%2Fdev-protocol-springcloud-stream) ### SpringCloud Stream 消息驱动组件总结 diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/RocketMQController.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/RocketMQController.java new file mode 100644 index 0000000..8d2bb65 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/RocketMQController.java @@ -0,0 +1,51 @@ +package org.example.controller; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.example.rocket.RocketMQProducer; +import org.example.vo.QMessage; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + *

SpringBoot 集成 RocketMQ

+ * */ +@Slf4j +@RestController +@RequestMapping("/rocket-mq") +public class RocketMQController { + + private static final QMessage RocketMQMessage = new QMessage( + 1, + "Q-Study-RocketMQ-In-SpringBoot" + ); + + private final RocketMQProducer rocketMQProducer; + + public RocketMQController(RocketMQProducer rocketMQProducer) { + this.rocketMQProducer = rocketMQProducer; + } + + @GetMapping("/message-with-value") + public void sendMessageWithValue() { + rocketMQProducer.sendMessageWithValue(JSON.toJSONString(RocketMQMessage)); + } + + @GetMapping("/message-with-key") + public void sendMessageWithKey() { + rocketMQProducer.sendMessageWithKey("qy", JSON.toJSONString(RocketMQMessage)); + } + + @GetMapping("/message-with-tag") + public void sendMessageWithTag() { + rocketMQProducer.sendMessageWithTag("qy", + JSON.toJSONString(RocketMQMessage)); + } + + @GetMapping("/message-with-all") + public void sendMessageWithAll() { + rocketMQProducer.sendMessageWithAll("q", "q", + JSON.toJSONString(RocketMQMessage)); + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerMessageExt.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerMessageExt.java new file mode 100644 index 0000000..69a832f --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerMessageExt.java @@ -0,0 +1,29 @@ +package org.example.rocket; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +/** + *

第三个 RocketMQ 消费者,

+ * */ +@Slf4j +@Component +@RocketMQMessageListener( + topic = "q-study-rocketmq", + consumerGroup = "q-springboot-rocketmq-message-ext" +) +public class RocketMQConsumerMessageExt implements RocketMQListener { + + @Override + public void onMessage(MessageExt message) { + + String value = new String(message.getBody()); + log.info("consume message in RocketMQConsumerMessageExt: [{}], [{}]", + message.getKeys(), value); // 能拿到消息中的 key + log.info("MessageExt: [{}]", JSON.toJSONString(message)); // 会慢一些 + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerObject.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerObject.java new file mode 100644 index 0000000..1a58319 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerObject.java @@ -0,0 +1,27 @@ +package org.example.rocket; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.example.vo.QMessage; +import org.springframework.stereotype.Component; + +/** + *

第四个, RocketMQ 消费者, 指定消费带有 tag 的消息, 且消费的是 Java Pojo

+ * */ +@Slf4j +@Component +@RocketMQMessageListener( + topic = "imooc-study-rocketmq", + consumerGroup = "q-springboot-rocketmq-tag-object", + selectorExpression = "q" // 根据 tag 做过滤 +) +public class RocketMQConsumerObject implements RocketMQListener { + @Override + public void onMessage(QMessage message) { + log.info("consume message in RocketMQConsumerObject: [{}]", + JSON.toJSONString(message)); + // so something + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerString.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerString.java new file mode 100644 index 0000000..5c274bd --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerString.java @@ -0,0 +1,28 @@ +package org.example.rocket; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.example.vo.QMessage; +import org.springframework.stereotype.Component; + +/** + *

第一个 RocketMQ 消费者

+ * */ +@Slf4j +@Component +@RocketMQMessageListener( + topic = "q-study-rocketmq", + consumerGroup = "q-springboot-rocketmq-string" +) +public class RocketMQConsumerString implements RocketMQListener { + + @Override + public void onMessage(String message) { + + QMessage rocketMessage = JSON.parseObject(message, QMessage.class); + log.info("consume message in RocketMQConsumerString: [{}]", + JSON.toJSONString(rocketMessage)); + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerTagString.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerTagString.java new file mode 100644 index 0000000..c5a8f7d --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerTagString.java @@ -0,0 +1,28 @@ +package org.example.rocket; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.example.vo.QMessage; +import org.springframework.stereotype.Component; + +/** + *

第二个 RocketMQ 消费者, 指定了消费带有 tag 的消息

+ * */ +@Slf4j +@Component +@RocketMQMessageListener( + topic = "q-study-rocketmq", + consumerGroup = "q-springboot-rocketmq-tag-string", + selectorExpression = "qy" // 根据 tag 过滤, tag 中要带有 qy +) +public class RocketMQConsumerTagString implements RocketMQListener { + @Override + public void onMessage(String message) { + + QMessage rocketMessage = JSON.parseObject(message, QMessage.class); + log.info("consume message in RocketMQConsumerTagString: [{}]", + JSON.toJSONString(rocketMessage)); + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQProducer.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQProducer.java new file mode 100644 index 0000000..13711ff --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQProducer.java @@ -0,0 +1,100 @@ +package org.example.rocket; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.example.vo.QMessage; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +/** + *

通过 RocketMQ 发送消息

+ * Spring Messaging 模块 + * */ +@Slf4j +@Component +public class RocketMQProducer { + + /** 类似 Kafka 中的 topic, 默认的读写队列都是4个, 默认自动创建topic */ + private static final String TOPIC = "q-study-rocketmq"; + + /** RocketMQ 客户端 */ + private final RocketMQTemplate rocketMQTemplate; + + public RocketMQProducer(RocketMQTemplate rocketMQTemplate) { + this.rocketMQTemplate = rocketMQTemplate; + } + + /** + *

使用同步的方式发送消息, 不指定 key 和 tag

+ * */ + public void sendMessageWithValue(String value) { + + // 随机选择一个 Topic 的 Message Queue 发送消息 + SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, value); + log.info("sendMessageWithValue result: [{}]", JSON.toJSONString(sendResult)); + + SendResult sendResultOrderly = rocketMQTemplate.syncSendOrderly( + TOPIC, value, "QQQ" + ); + log.info("sendMessageWithValue orderly result: [{}]", + JSON.toJSONString(sendResultOrderly)); + } + + /** + *

使用异步的方式发送消息, 指定 key

+ * */ + public void sendMessageWithKey(String key, String value) { + + Message message = MessageBuilder.withPayload(value) + // 这个Key 不是分区的效果, 只是方便进行查询, 在设置的时候, 可以使用空格进行分开, 例如: aaaa bbb + .setHeader(RocketMQHeaders.KEYS, key).build(); + + // 异步发送消息, 并设定回调 + rocketMQTemplate.asyncSend(TOPIC, message, new SendCallback() { + + @Override + public void onSuccess(SendResult sendResult) { + log.info("sendMessageWithKey success result: [{}]", + JSON.toJSONString(sendResult)); + } + + @Override + public void onException(Throwable e) { + log.error("sendMessageWithKey failure: [{}]", e.getMessage(), e); + } + }); + } + + /** + *

使用同步的方式发送消息, 带有 tag, 且发送的是 Java Pojo

+ * 发送消息可以定义自己的消息数据结构 + * */ + public void sendMessageWithTag(String tag, String value) { + + QMessage qMessage = JSON.parseObject(value, QMessage.class); + SendResult sendResult = rocketMQTemplate.syncSend( + String.format("%s:%s", TOPIC, tag), // 不同的消费者组使用不同的 tag + qMessage + ); + log.info("sendMessageWithTag result: [{}]", JSON.toJSONString(sendResult)); + } + + /** + *

使用同步的方式发送消息, 带有 key 和 tag

+ * */ + public void sendMessageWithAll(String key, String tag, String value) { + + Message message = MessageBuilder.withPayload(value) + .setHeader(RocketMQHeaders.KEYS, key).build(); + SendResult sendResult = rocketMQTemplate.syncSend( + String.format("%s:%s", TOPIC, tag), + message + ); + log.info("sendMessageWithAll result: [{}]", JSON.toJSONString(sendResult)); + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/rocket-mq-controller.http b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/rocket-mq-controller.http index 3fd0c0b..dc3cd2d 100644 --- a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/rocket-mq-controller.http +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/rocket-mq-controller.http @@ -1,15 +1,15 @@ ### message-with-value -GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-value +GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-value Content-Type: application/json ### message-with-key -GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-key +GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-key Content-Type: application/json ### message-with-tag -GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-tag +GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-tag Content-Type: application/json ### message-with-all -GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-all +GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-all Content-Type: application/json diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/bootstrap.yml b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/bootstrap.yml index 775c208..500ebe9 100644 --- a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/bootstrap.yml +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/bootstrap.yml @@ -12,7 +12,7 @@ spring: discovery: enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可 server-addr: 127.0.0.1:8848 - namespace: 1bc13fd5-843b-4ac0-aa55-695c25bc0ac6 + namespace: 1ccc74ae-9398-4dbe-b9d7-4f9addf9f40c metadata: management: context-path: ${server.servlet.context-path}/actuator @@ -27,15 +27,15 @@ spring: # rocketmq: # binder: # name-server: 127.0.0.1:9876 - # fixme 开启 stream 分区支持 + # 开启 stream 分区支持 instanceCount: 1 # 消费者的总数 - instanceIndex: 0 # 当前消费者的索引 + instanceIndex: 0 # 当前消费者的索引, 多个的时候, 另外一个要配置成 1, 依次类推 bindings: # 默认发送方 output: # 这里用 Stream 给我们提供的默认 output 信道 destination: dev-protocol-springcloud-stream-client-default # 消息发往的目的地, Kafka 中就是 Topic content-type: text/plain # 消息发送的格式, 接收端不用指定格式, 但是发送端要 - # fixme 消息分区 + # 消息分区 - 只对默认接收方和发送方进行配置, 下面自定义的原理相同 producer: # fixme partitionKeyExpression 这种方式需要我们需要对 author 字段进行各种处理, 这里不使用这个方式, 使用下面自定义的配置策略 # partitionKeyExpression: payload.author # 分区关键字, payload 指的是发送的对象, author 是对象中的属性, 不存在 author 可能会报错 @@ -46,9 +46,9 @@ spring: # 默认接收方 input: # 这里用 Stream 给我们提供的默认 input 信道 destination: dev-protocol-springcloud-stream-client-default - # fixme 这里指定了group 方便其进行 分组消费 + # 这里指定了group 方便其进行 分组消费 group: dev-protocol-springcloud-stream-client-group-default - # fixme 消费者开启 分区支持 + # 消费者开启 分区支持 consumer: partitioned: true # q 发送方 destination 对应