diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/SpringCloud-Stream.md b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/SpringCloud-Stream.md
similarity index 99%
rename from dev-protocol-springcloud/dev-protocol-springcloud-stream/SpringCloud-Stream.md
rename to dev-protocol-springcloud/dev-protocol-springcloud-message-study/SpringCloud-Stream.md
index fd4d062..f15be38 100644
--- a/dev-protocol-springcloud/dev-protocol-springcloud-stream/SpringCloud-Stream.md
+++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/SpringCloud-Stream.md
@@ -3,10 +3,10 @@
### SpringBoot 集成 Kafka 构建消息驱动微服务
-
+-
### SpringBoot 集成 RocketMQ 构建消息驱动微服务
-
+-
### SpringCloud Stream 消息驱动组件概览
diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/pom.xml b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/pom.xml
new file mode 100644
index 0000000..6a62ee6
--- /dev/null
+++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/pom.xml
@@ -0,0 +1,99 @@
+
+
+ 4.0.0
+
+ org.example
+ dev-protocol
+ 1.0-SNAPSHOT
+ ../../pom.xml
+
+
+ dev-protocol-springcloud-message-study
+ 1.0-SNAPSHOT
+ jar
+
+ dev-protocol-springcloud-message-study
+ MQ 学习实践
+
+
+ 8
+ 8
+ UTF-8
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.projectlombok
+ lombok
+ true
+
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+ org.springframework.cloud
+ spring-cloud-context
+ 2.2.6.RELEASE
+
+
+
+ org.springframework.kafka
+ spring-kafka
+ 2.5.0.RELEASE
+
+
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+ 2.1.0
+
+
+ org.apache.commons
+ commons-lang3
+ 3.11
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.19
+
+
+
+ org.springframework.boot
+ spring-boot-starter-data-jpa
+
+
+
+
+ ${artifactId}
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+ repackage
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/Application.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/Application.java
new file mode 100644
index 0000000..5d8dbdc
--- /dev/null
+++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/Application.java
@@ -0,0 +1,14 @@
+package org.example;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+ *
工程启动入口
+ * */
+@SpringBootApplication
+public class Application {
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class, args);
+ }
+}
\ No newline at end of file
diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/KafkaController.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/KafkaController.java
new file mode 100644
index 0000000..43daacc
--- /dev/null
+++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/KafkaController.java
@@ -0,0 +1,41 @@
+package org.example.controller;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.example.kafka.KafkaProducer;
+import org.example.vo.QMessage;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * SpringBoot 集成 kafka 发送消息
+ * */
+@Slf4j
+@RestController
+@RequestMapping("/kafka")
+public class KafkaController {
+
+ private final ObjectMapper mapper;
+ private final KafkaProducer kafkaProducer;
+
+ public KafkaController(ObjectMapper mapper, KafkaProducer kafkaProducer) {
+ this.mapper = mapper;
+ this.kafkaProducer = kafkaProducer;
+ }
+
+ /**
+ * 发送 kafka 消息
+ * */
+ @GetMapping("/send-message")
+ public void sendMessage(@RequestParam(required = false) String key,
+ @RequestParam String topic) throws Exception {
+
+ QMessage message = new QMessage(
+ 1,
+ "q-Study-Message"
+ );
+ kafkaProducer.sendMessage(key, mapper.writeValueAsString(message), topic);
+ }
+}
diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConfig.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConfig.java
new file mode 100644
index 0000000..2882d17
--- /dev/null
+++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConfig.java
@@ -0,0 +1,81 @@
+package org.example.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 通过代码自定义 Kafka 配置
+ *
+ * 复杂不会怎么更改的配置信息最好以代码的方式进行配置
+ * 一般一些常用的改动的配置, 直接放在配置文件中
+ * */
+@Configuration
+public class KafkaConfig {
+
+ @Value("${spring.kafka.bootstrap-servers}")
+ private String bootstrapServers;
+
+ /**
+ * Kafka Producer 工厂类配置
+ * */
+ @Bean
+ public ProducerFactory producerFactory() {
+
+ Map configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ return new DefaultKafkaProducerFactory<>(configs);
+ }
+
+ /**
+ * Kafka Producer 客户端
+ * */
+ @Bean
+ public KafkaTemplate kafkaTemplate() {
+ return new KafkaTemplate<>(producerFactory());
+ }
+
+ /**
+ * Kafka Consumer 工厂类配置
+ * */
+ @Bean
+ public ConsumerFactory consumerFactory() {
+
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ // 因为配置为拉取模式, 最多拉取 50条记录
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ return new DefaultKafkaConsumerFactory<>(props);
+ }
+
+ /**
+ * Kafka Consumer 监听器工厂类配置
+ * */
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory
+ kafkaListenerContainerFactory() {
+
+ ConcurrentKafkaListenerContainerFactory factory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ // 并发数就是一个消费者实例起几个线程
+ factory.setConcurrency(3);
+ factory.setConsumerFactory(consumerFactory());
+
+ return factory;
+ }
+}
diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConsumer.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConsumer.java
new file mode 100644
index 0000000..1d911ae
--- /dev/null
+++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConsumer.java
@@ -0,0 +1,56 @@
+package org.example.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.example.vo.QMessage;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+import java.util.Optional;
+
+/**
+ * Kafka 消费者
+ * */
+@Slf4j
+@Component
+public class KafkaConsumer {
+
+ private final ObjectMapper mapper;
+
+ public KafkaConsumer(ObjectMapper mapper) {
+ this.mapper = mapper;
+ }
+
+ /**
+ * 监听 Kafka 消息并消费
+ * */
+ @KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka")
+ public void listener01(ConsumerRecord record) throws Exception {
+
+ String key = record.key();
+ String value = record.value();
+
+ QMessage kafkaMessage = mapper.readValue(value, QMessage.class);
+ log.info("in listener01 consume kafka message: [{}], [{}]",
+ key, mapper.writeValueAsString(kafkaMessage));
+ }
+
+ /**
+ * 监听 Kafka 消息并消费
+ * 不知道发送的类是什么类型的时候发送的
+ * */
+ @KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka-1")
+ public void listener02(ConsumerRecord, ?> record) throws Exception {
+
+ Optional> _kafkaMessage = Optional.ofNullable(record.value());
+ if (_kafkaMessage.isPresent()) {
+ Object message = _kafkaMessage.get();
+ // 如果不能确定类型的时候, 下面的代码要进行包装
+ QMessage kafkaMessage = mapper.readValue(message.toString(),
+ QMessage.class);
+ log.info("in listener02 consume kafka message: [{}]",
+ mapper.writeValueAsString(kafkaMessage));
+ }
+ }
+}
diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaProducer.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaProducer.java
new file mode 100644
index 0000000..b63987c
--- /dev/null
+++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaProducer.java
@@ -0,0 +1,77 @@
+package org.example.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Component;
+import org.springframework.util.concurrent.ListenableFuture;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * kafka 生产者
+ * */
+@Slf4j
+@Component
+public class KafkaProducer {
+
+ private final KafkaTemplate kafkaTemplate;
+
+ public KafkaProducer(KafkaTemplate kafkaTemplate) {
+ this.kafkaTemplate = kafkaTemplate;
+ }
+
+ /**
+ * 发送 kafka 消息
+ * */
+ public void sendMessage(String key, String value, String topic) {
+
+ if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) {
+ throw new IllegalArgumentException("value or topic is null or empty");
+ }
+
+ ListenableFuture> future = StringUtils.isBlank(key) ?
+ kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value);
+
+ // 异步回调的方式获取通知
+ future.addCallback(
+ success -> {
+ // 元数据信息不为空
+ assert null != success && null != success.getRecordMetadata();
+ // 发送到 kafka 的 topic
+ String _topic = success.getRecordMetadata().topic();
+ // 消息发送到的分区
+ int partition = success.getRecordMetadata().partition();
+ // 消息在分区内的 offset
+ long offset = success.getRecordMetadata().offset();
+
+ log.info("send kafka message success: [{}], [{}], [{}]",
+ _topic, partition, offset);
+ }, failure -> {
+ log.error("send kafka message failure: [{}], [{}], [{}]",
+ key, value, topic);
+ }
+ );
+ // future 支持多次获取消息, 不需要重新发送消息
+
+ // 同步等待的方式获取通知
+ try {
+// SendResult sendResult = future.get();
+ SendResult sendResult = future.get(5, TimeUnit.SECONDS);
+
+ // 发送到 kafka 的 topic
+ String _topic = sendResult.getRecordMetadata().topic();
+ // 消息发送到的分区
+ int partition = sendResult.getRecordMetadata().partition();
+ // 消息在分区内的 offset
+ long offset = sendResult.getRecordMetadata().offset();
+
+ log.info("send kafka message success: [{}], [{}], [{}]",
+ _topic, partition, offset);
+ } catch (Exception ex) {
+ log.error("send kafka message failure: [{}], [{}], [{}]",
+ key, value, topic);
+ }
+ }
+}
diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/vo/QMessage.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/vo/QMessage.java
new file mode 100644
index 0000000..7d4c20f
--- /dev/null
+++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/vo/QMessage.java
@@ -0,0 +1,27 @@
+package org.example.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * 通过 Kafka 传递的消息对象
+ * */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class QMessage {
+
+ /**
+ * 唯一的标识一个消息对象
+ */
+ private Integer id;
+
+ /**
+ * 项目名称
+ */
+ private String projectName;
+
+ // todo 自己进行扩展
+
+}
diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/application-dev.yml b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/application-dev.yml
new file mode 100644
index 0000000..9695606
--- /dev/null
+++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/application-dev.yml
@@ -0,0 +1,49 @@
+server:
+ port: 8001
+ servlet:
+ context-path: /dev-protocol-springcloud-message-study
+
+spring:
+ # SpringBoot 集成 Kafka 的配置, 最低配置只需要配置 spring.kafka.bootstrap-servers
+ kafka:
+ bootstrap-servers: 127.0.0.1:9092
+ jpa:
+ show-sql: true
+ hibernate:
+ ddl-auto: none
+ properties:
+ hibernate.show_sql: true
+ hibernate.format_sql: true
+ open-in-view: false
+ datasource:
+ # 数据源
+ url: jdbc:mysql://127.0.0.1:3306/dev_protocol_springcloud_project?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
+ username: root
+ password: root
+ type: com.zaxxer.hikari.HikariDataSource
+ driver-class-name: com.mysql.cj.jdbc.Driver
+ # 连接池
+ hikari:
+ maximum-pool-size: 8
+ minimum-idle: 4
+ idle-timeout: 30000
+ connection-timeout: 30000
+ max-lifetime: 45000
+ auto-commit: true
+ pool-name: devProtocolSpringcloudHikariCP
+# consumer:
+ # 如果 Consumer 没有指定 group-id, 则使用配置文件中配置的; 如果配置文件中也没有定义, 则由框架随机生成
+# group-id: imooc-study-ecommerce
+# auto-offset-reset: latest
+# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+# producer:
+# key-serializer: org.apache.kafka.common.serialization.StringSerializer
+# value-serializer: org.apache.kafka.common.serialization.StringSerializer
+
+# RocketMQ 的配置, 这是最低配置
+rocketmq:
+ name-server: 127.0.0.1:9876
+ producer:
+ # 发送同一类消息的设置为同一个 group, 保证唯一
+ group: dev-protocol-springcloud-message-study
diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/bootstrap.yml b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/bootstrap.yml
new file mode 100644
index 0000000..2366e3d
--- /dev/null
+++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/bootstrap.yml
@@ -0,0 +1,16 @@
+spring:
+ profiles:
+ # prod, dev
+ active: dev
+ application:
+ name: dev-protocol-springcloud-message-study
+
+# 暴露端点
+management:
+ endpoints:
+ web:
+ exposure:
+ include: '*'
+ endpoint:
+ health:
+ show-details: always
diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/kafka-controller.http b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/kafka-controller.http
new file mode 100644
index 0000000..6d183bd
--- /dev/null
+++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/kafka-controller.http
@@ -0,0 +1,7 @@
+### kafka-send-message
+GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/kafka/send-message?key=qqq&topic=q-springboot
+Content-Type: application/json
+
+### kafka-send-message - (测试kafka 支持无key消息, key 是用来分区的)
+GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/kafka/send-message?topic=q-springboot
+Content-Type: application/json
diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/rocket-mq-controller.http b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/rocket-mq-controller.http
new file mode 100644
index 0000000..3fd0c0b
--- /dev/null
+++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/rocket-mq-controller.http
@@ -0,0 +1,15 @@
+### message-with-value
+GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-value
+Content-Type: application/json
+
+### message-with-key
+GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-key
+Content-Type: application/json
+
+### message-with-tag
+GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-tag
+Content-Type: application/json
+
+### message-with-all
+GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-all
+Content-Type: application/json
diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/transactional-lose.http b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/transactional-lose.http
new file mode 100644
index 0000000..de7d991
--- /dev/null
+++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/transactional-lose.http
@@ -0,0 +1,7 @@
+### wrong-rollback-for
+GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/transactional-lose/wrong-rollback-for
+Content-Type: application/json
+
+### wrong-inner-call
+GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/transactional-lose/wrong-inner-call
+Content-Type: application/json
diff --git a/pom.xml b/pom.xml
index d1d50db..fb471b6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@
dev-protocol-springcloud/dev-protocol-springcloud-project-goods-service
dev-protocol-springcloud/dev-protocol-springcloud-hystrix
dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard
+ dev-protocol-springcloud/dev-protocol-springcloud-message-study