diff --git a/mq/rocketmq/README.md b/mq/rocketmq/README.md
index 80b39b4..a79bdfa 100644
--- a/mq/rocketmq/README.md
+++ b/mq/rocketmq/README.md
@@ -70,7 +70,22 @@
- 可以集成控制台,修改控制台即可
- 集成SpringBoot也在这里
-## 2. RocketMQ 生产者核心研究
+## 2. RocketMQ 基本入门
+
+### 2.1 生产者的使用及使用控制台查消息
+1. 创建生产者对象 DefaultMQProducer (必须要对生产者组的名称进行配置)
+2. 设置 NamesrvAddr
+3. 设置一些需要配置的参数
+4. 启动生产者服务
+5. 创建消息并发送
+
+### 2.2 消费者的使用及Broker重试机制
+1. 创建消费者对象 DefaultMQPushConsumer
+2. 设置NamesrvAddr及其消费位置ConsumerFromWhere
+3. 进行订阅主题 subscribe
+4. 注册监听并消费 registerMessageListener
+
+### 2.3 RocketMQ - 四种集群环境构建详解
diff --git a/mq/rocketmq/rocket-api/pom.xml b/mq/rocketmq/rocket-api/pom.xml
new file mode 100644
index 0000000..47d62e6
--- /dev/null
+++ b/mq/rocketmq/rocket-api/pom.xml
@@ -0,0 +1,39 @@
+
+
+
+ dev-protocol
+ org.example
+ 1.0-SNAPSHOT
+ ../../../pom.xml
+
+ 4.0.0
+
+ rocket-api
+
+
+ 8
+ 8
+
+ 4.3.0
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.apache.rocketmq
+ rocketmq-client
+ ${rocketmq.version}
+
+
+
+
\ No newline at end of file
diff --git a/mq/rocketmq/rocket-api/src/main/java/com/baiye/constants/Const.java b/mq/rocketmq/rocket-api/src/main/java/com/baiye/constants/Const.java
new file mode 100644
index 0000000..d90564d
--- /dev/null
+++ b/mq/rocketmq/rocket-api/src/main/java/com/baiye/constants/Const.java
@@ -0,0 +1,9 @@
+package com.baiye.constants;
+
+public class Const {
+
+ /**
+ * namesrv 的地址
+ */
+ public static final String NAMESRV_ADDR = "127.0.0.1:9876";
+}
diff --git a/mq/rocketmq/rocket-api/src/main/java/com/baiye/consumer/Consumer.java b/mq/rocketmq/rocket-api/src/main/java/com/baiye/consumer/Consumer.java
new file mode 100644
index 0000000..b79139b
--- /dev/null
+++ b/mq/rocketmq/rocket-api/src/main/java/com/baiye/consumer/Consumer.java
@@ -0,0 +1,72 @@
+package com.baiye.consumer;
+
+import com.baiye.constants.Const;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * 消费者 Demo
+ */
+public class Consumer {
+
+ public static void main(String[] args) throws MQClientException {
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_name");
+
+ consumer.setNamesrvAddr(Const.NAMESRV_ADDR);
+
+ // 设置从什么地方开始消费
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+
+ consumer.subscribe(
+ "test-topic",
+ "*"
+ );
+
+ consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {
+ MessageExt messageExt = msgs.get(0);
+
+ try {
+ String topic = messageExt.getTopic();
+ String tags = messageExt.getTags();
+ String keys = messageExt.getKeys();
+
+ // FIXME: 2022/7/21 模拟消息消费出现异常的情况
+ if ("key1".equals(keys)) {
+ System.out.println(" 消息消费失败 ... ");
+ int a = 1 / 0;
+ }
+ String msgBody = new String(messageExt.getBody(), StandardCharsets.UTF_8);
+
+ System.out.println("topic = " + topic);
+ System.out.println("tags = " + tags);
+ System.out.println("keys = " + keys);
+ System.out.println("msgBody = " + msgBody);
+ } catch (Exception e) {
+ e.printStackTrace();
+
+ // FIXME: 2022/7/21 允许消息最大重试次数是多少次,我这里指定为3次
+ int reconsumeTimes = messageExt.getReconsumeTimes();
+ System.out.println("reconsumeTimes = " + reconsumeTimes);
+ if (3 == reconsumeTimes) {
+ // FIXME: 2022/7/21 记录日志,做补偿处理
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+
+ consumer.start();
+
+ System.out.println(" consumer start ... ");
+
+ }
+}
diff --git a/mq/rocketmq/rocket-api/src/main/java/com/baiye/producer/Producer.java b/mq/rocketmq/rocket-api/src/main/java/com/baiye/producer/Producer.java
new file mode 100644
index 0000000..65a968d
--- /dev/null
+++ b/mq/rocketmq/rocket-api/src/main/java/com/baiye/producer/Producer.java
@@ -0,0 +1,49 @@
+package com.baiye.producer;
+
+import com.baiye.constants.Const;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * 生产者 Demo
+ */
+public class Producer {
+
+ public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
+
+ DefaultMQProducer producer = new DefaultMQProducer("test_producer_name");
+
+ producer.setNamesrvAddr(Const.NAMESRV_ADDR);
+
+ producer.start();
+
+ for (int i = 0; i < 10; i++) {
+ // 创建消息
+ Message message = new Message(
+ // 主题
+ "test-topic",
+ // 过滤
+ "TAG-A",
+ // 用户自定义的key, 唯一标识
+ "TAG-a",
+ // 消息内容
+ ("Hello RocketMQ" + i).getBytes(StandardCharsets.UTF_8)
+ );
+
+ // 发送消息
+ SendResult sendResult = producer.send(message);
+
+ // 消息发送结果
+ System.out.println(sendResult);
+ }
+
+ // 发送消息完毕进行关闭
+ producer.shutdown();
+ }
+}
diff --git a/pom.xml b/pom.xml
index ff2481d..d816f1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,6 +33,7 @@
longpolling/demo/demo1/dev-protocol-disruptor-netty-common
utils/dev-protocol-id
database/shardingsphere/dev-protocol-shardingsphere-base
+ mq/rocketmq/rocket-api