From be3c49e18365db23bfb29881b9dcfa02de0edfe4 Mon Sep 17 00:00:00 2001 From: qyx <565485304@qq.com> Date: Fri, 22 Jul 2022 11:32:58 +0800 Subject: [PATCH] =?UTF-8?q?[=E6=96=B0=E5=A2=9E=E5=8A=9F=E8=83=BD](master):?= =?UTF-8?q?=20=E6=B7=BB=E5=8A=A0RocketMQ=E7=9B=B8=E5=85=B3=E7=9A=84?= =?UTF-8?q?=E6=96=87=E6=A1=A3=E5=8F=8A=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 简单添加了代码 --- mq/rocketmq/README.md | 17 ++++- mq/rocketmq/rocket-api/pom.xml | 39 ++++++++++ .../main/java/com/baiye/constants/Const.java | 9 +++ .../java/com/baiye/consumer/Consumer.java | 72 +++++++++++++++++++ .../java/com/baiye/producer/Producer.java | 49 +++++++++++++ pom.xml | 1 + 6 files changed, 186 insertions(+), 1 deletion(-) create mode 100644 mq/rocketmq/rocket-api/pom.xml create mode 100644 mq/rocketmq/rocket-api/src/main/java/com/baiye/constants/Const.java create mode 100644 mq/rocketmq/rocket-api/src/main/java/com/baiye/consumer/Consumer.java create mode 100644 mq/rocketmq/rocket-api/src/main/java/com/baiye/producer/Producer.java diff --git a/mq/rocketmq/README.md b/mq/rocketmq/README.md index 80b39b4..a79bdfa 100644 --- a/mq/rocketmq/README.md +++ b/mq/rocketmq/README.md @@ -70,7 +70,22 @@ - 可以集成控制台,修改控制台即可 - 集成SpringBoot也在这里 -## 2. RocketMQ 生产者核心研究 +## 2. RocketMQ 基本入门 + +### 2.1 生产者的使用及使用控制台查消息 +1. 创建生产者对象 DefaultMQProducer (必须要对生产者组的名称进行配置) +2. 设置 NamesrvAddr +3. 设置一些需要配置的参数 +4. 启动生产者服务 +5. 创建消息并发送 + +### 2.2 消费者的使用及Broker重试机制 +1. 创建消费者对象 DefaultMQPushConsumer +2. 设置NamesrvAddr及其消费位置ConsumerFromWhere +3. 进行订阅主题 subscribe +4. 注册监听并消费 registerMessageListener + +### 2.3 RocketMQ - 四种集群环境构建详解 diff --git a/mq/rocketmq/rocket-api/pom.xml b/mq/rocketmq/rocket-api/pom.xml new file mode 100644 index 0000000..47d62e6 --- /dev/null +++ b/mq/rocketmq/rocket-api/pom.xml @@ -0,0 +1,39 @@ + + + + dev-protocol + org.example + 1.0-SNAPSHOT + ../../../pom.xml + + 4.0.0 + + rocket-api + + + 8 + 8 + + 4.3.0 + + + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-test + test + + + org.apache.rocketmq + rocketmq-client + ${rocketmq.version} + + + + \ No newline at end of file diff --git a/mq/rocketmq/rocket-api/src/main/java/com/baiye/constants/Const.java b/mq/rocketmq/rocket-api/src/main/java/com/baiye/constants/Const.java new file mode 100644 index 0000000..d90564d --- /dev/null +++ b/mq/rocketmq/rocket-api/src/main/java/com/baiye/constants/Const.java @@ -0,0 +1,9 @@ +package com.baiye.constants; + +public class Const { + + /** + * namesrv 的地址 + */ + public static final String NAMESRV_ADDR = "127.0.0.1:9876"; +} diff --git a/mq/rocketmq/rocket-api/src/main/java/com/baiye/consumer/Consumer.java b/mq/rocketmq/rocket-api/src/main/java/com/baiye/consumer/Consumer.java new file mode 100644 index 0000000..b79139b --- /dev/null +++ b/mq/rocketmq/rocket-api/src/main/java/com/baiye/consumer/Consumer.java @@ -0,0 +1,72 @@ +package com.baiye.consumer; + +import com.baiye.constants.Const; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; + +import java.nio.charset.StandardCharsets; + +/** + * 消费者 Demo + */ +public class Consumer { + + public static void main(String[] args) throws MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_name"); + + consumer.setNamesrvAddr(Const.NAMESRV_ADDR); + + // 设置从什么地方开始消费 + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + + consumer.subscribe( + "test-topic", + "*" + ); + + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> { + MessageExt messageExt = msgs.get(0); + + try { + String topic = messageExt.getTopic(); + String tags = messageExt.getTags(); + String keys = messageExt.getKeys(); + + // FIXME: 2022/7/21 模拟消息消费出现异常的情况 + if ("key1".equals(keys)) { + System.out.println(" 消息消费失败 ... "); + int a = 1 / 0; + } + String msgBody = new String(messageExt.getBody(), StandardCharsets.UTF_8); + + System.out.println("topic = " + topic); + System.out.println("tags = " + tags); + System.out.println("keys = " + keys); + System.out.println("msgBody = " + msgBody); + } catch (Exception e) { + e.printStackTrace(); + + // FIXME: 2022/7/21 允许消息最大重试次数是多少次,我这里指定为3次 + int reconsumeTimes = messageExt.getReconsumeTimes(); + System.out.println("reconsumeTimes = " + reconsumeTimes); + if (3 == reconsumeTimes) { + // FIXME: 2022/7/21 记录日志,做补偿处理 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + + consumer.start(); + + System.out.println(" consumer start ... "); + + } +} diff --git a/mq/rocketmq/rocket-api/src/main/java/com/baiye/producer/Producer.java b/mq/rocketmq/rocket-api/src/main/java/com/baiye/producer/Producer.java new file mode 100644 index 0000000..65a968d --- /dev/null +++ b/mq/rocketmq/rocket-api/src/main/java/com/baiye/producer/Producer.java @@ -0,0 +1,49 @@ +package com.baiye.producer; + +import com.baiye.constants.Const; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.nio.charset.StandardCharsets; + +/** + * 生产者 Demo + */ +public class Producer { + + public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { + + DefaultMQProducer producer = new DefaultMQProducer("test_producer_name"); + + producer.setNamesrvAddr(Const.NAMESRV_ADDR); + + producer.start(); + + for (int i = 0; i < 10; i++) { + // 创建消息 + Message message = new Message( + // 主题 + "test-topic", + // 过滤 + "TAG-A", + // 用户自定义的key, 唯一标识 + "TAG-a", + // 消息内容 + ("Hello RocketMQ" + i).getBytes(StandardCharsets.UTF_8) + ); + + // 发送消息 + SendResult sendResult = producer.send(message); + + // 消息发送结果 + System.out.println(sendResult); + } + + // 发送消息完毕进行关闭 + producer.shutdown(); + } +} diff --git a/pom.xml b/pom.xml index ff2481d..d816f1e 100644 --- a/pom.xml +++ b/pom.xml @@ -33,6 +33,7 @@ longpolling/demo/demo1/dev-protocol-disruptor-netty-common utils/dev-protocol-id database/shardingsphere/dev-protocol-shardingsphere-base + mq/rocketmq/rocket-api