[新增功能](master): springboot-rocketmq整合

整合Springboot与RocketMQ相关的代码
master
土豆兄弟 2 years ago
parent 762150042d
commit 105486b682

@ -50,12 +50,6 @@
<artifactId>spring-kafka</artifactId> <artifactId>spring-kafka</artifactId>
<version>2.5.0.RELEASE</version> <version>2.5.0.RELEASE</version>
</dependency> </dependency>
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>

@ -1,15 +0,0 @@
### message-with-value
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-value
Content-Type: application/json
### message-with-key
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-key
Content-Type: application/json
### message-with-tag
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-tag
Content-Type: application/json
### message-with-all
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-all
Content-Type: application/json

@ -0,0 +1,87 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dev-protocol</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>springboot-rocketmq</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- SpringBoot 监控端点 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 让 SpringBoot 能够识别 bootstrap.yml -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
<!-- MySQL 驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>
<!-- Spring Data Jpa -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,12 @@
package com.baiye;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootRocketMQApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootRocketMQApplication.class, args);
}
}

@ -0,0 +1,57 @@
package com.baiye.controller;
import com.alibaba.fastjson.JSON;
import com.baiye.rocket.RocketMQProducer;
import com.baiye.vo.QMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* rocketmq-controller
*
* @author q
* @date 2022/07/24
*/
@Slf4j
@RestController
@RequestMapping("/rocket-mq")
public class RocketMQController {
/**
*
*/
public static final QMessage ROCKETMQMESSAGE = new QMessage(
1,
"Q-Study-Rocket-Springboot-Message"
);
public final RocketMQProducer producer;
public RocketMQController(RocketMQProducer producer) {
this.producer = producer;
}
@GetMapping("/message-with-value")
public void sendMessageWithValue() {
producer.sendMessageWithValue(JSON.toJSONString(ROCKETMQMESSAGE));
}
@GetMapping("/message-with-key")
public void sendMessageWithKey() {
producer.sendMessageWithKey("q", JSON.toJSONString(ROCKETMQMESSAGE));
}
@GetMapping("/message-with-tag")
public void sendMessageWithTag() {
producer.sendMessageWithTag("q-tag", JSON.toJSONString(ROCKETMQMESSAGE));
}
@GetMapping("/message-with-all")
public void sendMessageWithAll(){
producer.sendMessageWithAll("qq qa", "q-tag", JSON.toJSONString(ROCKETMQMESSAGE));
}
}

@ -0,0 +1,36 @@
package com.baiye.rocket;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* Consumer
*
* MessageExt Message , , key
*
* @author q
* @date 2022/07/24
*/
@Slf4j
@Component
@RocketMQMessageListener(
// 与生产端的 topic 保持一致
topic = "study-springboot-rocketmq",
consumerGroup = "springboot-rocketmq-message-ext-group"
)
public class RocketMQConsumerMessageExt implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String value = new String(messageExt.getBody());
log.info("consumer message in RocketMQConsumerMessageExt: [{}], [{}]", messageExt.getKeys(), value);
// 打印 messageExt 全部信息进行了解所有元信息
// todo 获取元信息比较慢一点, 上线的时候没必要开启
// FIXME: 这里使用的是 fast-json
log.info("MessageExt: [{}]", JSON.toJSONString(messageExt));
}
}

@ -0,0 +1,33 @@
package com.baiye.rocket;
import com.alibaba.fastjson.JSON;
import com.baiye.vo.QMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* RocketMQ , tag , Java Pojo
*
* @author q
* @date 2022/07/24
*/
@Slf4j
@Component
@RocketMQMessageListener(
// 与生产端的 topic 保持一致
topic = "study-springboot-rocketmq",
consumerGroup = "springboot-rocketmq-tag-object-group",
// 指定匹配 tag - 匹配到 tag 才会被消费,相当于代替 if-else 的逻辑
selectorExpression = "q-tag"
)
public class RocketMQConsumerObject implements RocketMQListener<QMessage> {
@Override
public void onMessage(QMessage message) {
log.info("consumer message in RocketMQConsumerObject: [{}]", JSON.toJSONString(message));
// todo 做业务逻辑
}
}

@ -0,0 +1,31 @@
package com.baiye.rocket;
import com.alibaba.fastjson.JSON;
import com.baiye.vo.QMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* string
*
* @author q
* @date 2022/07/24
*/
@Slf4j
@Component
@RocketMQMessageListener(
// 与生产端的 topic 保持一致
topic = "study-springboot-rocketmq",
consumerGroup = "springboot-rocketmq-string-group"
)
public class RocketMQConsumerString implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
QMessage qMessage = JSON.parseObject(message, QMessage.class);
// FIXME: 这里使用的是 fast-json
log.info("consumer message in RocketMQConsumerString: [{}]", JSON.toJSONString(qMessage));
}
}

@ -0,0 +1,33 @@
package com.baiye.rocket;
import com.alibaba.fastjson.JSON;
import com.baiye.vo.QMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* RocketMQ , tag
*
* @author q
* @date 2022/07/24
*/
@Slf4j
@Component
@RocketMQMessageListener(
// 与生产端的 topic 保持一致
topic = "study-springboot-rocketmq",
consumerGroup = "springboot-rocketmq-tag-group",
// 指定匹配 tag - 匹配到 tag 才会被消费,相当于代替 if-else 的逻辑
selectorExpression = "q-tag"
)
public class RocketMQConsumerTagString implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
QMessage qMessage = JSON.parseObject(message, QMessage.class);
// FIXME: 这里使用的是 fast-json
log.info("consumer message in RocketMQConsumerTagString: [{}]", JSON.toJSONString(qMessage));
}
}

@ -0,0 +1,130 @@
package com.baiye.rocket;
import com.alibaba.fastjson.JSON;
import com.baiye.vo.QMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* RocketMQ
*
* :
*
* @author q
* @date 2022/07/24
*/
@Slf4j
@Component
public class RocketMQProducer {
/**
* Kafka topic, 4
*/
public static final String TOPIC = "study-springboot-rocketmq";
/**
* RocketMQ
*/
public final RocketMQTemplate rocketMQTemplate;
public RocketMQProducer(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
/**
* 使, kay tag
*
* @param value
*/
public void sendMessageWithValue(String value) {
// 发送方式1: 随机选择一个 Topic 的 Message Queue 发送消息
SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, value);
// FIXME: 这里使用的是 fast-json
log.info("sendMessageWithValue result : [{}]", JSON.toJSONString(sendResult));
// 发送方式2: 指定 sharding-key 分区[hashKey], 保证同一类型的消息顺序的发送和消费, 既保证的高并发的性能, 又保证了业务的连贯性
SendResult sendResultOrderly = rocketMQTemplate.syncSendOrderly(TOPIC, value, "Q-shard");
// FIXME: 这里使用的是 fast-json
log.info("sendMessageWithValue orderly result : [{}]", JSON.toJSONString(sendResultOrderly));
}
/**
* 使, key, 便
*
* @param key
* @param value
*/
public void sendMessageWithKey(String key, String value) {
// 是基于 Spring Messaging 做的, 注意这里的 Message 对象选择
Message<String> message = MessageBuilder
.withPayload(value)
// 这里的 key 是索引键的含义 多个是时候用空格进行间隔,例如: "a b"
.setHeader(RocketMQHeaders.KEYS, key)
.build();
// 异步发送消息, 并设定回调
rocketMQTemplate.asyncSend(TOPIC, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// FIXME: 这里使用的是 fast-json
log.info("sendMessageWithKey success result: [{}]", JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("sendMessageWithKey failure: [{}]", e.getMessage(), e);
}
});
}
/**
* 使, tag, Java Pojo -
* @param tag
* @param value
*/
public void sendMessageWithTag(String tag, String value) {
// 对象反序列化
QMessage qMessage = JSON.parseObject(value, QMessage.class);
// tag 含义: 消费者组订阅同一个topic, 不同的tag可以对消息进行隔离, 但是要保证同一个消费者组要保持定义的tag是一样的
SendResult sendResult = rocketMQTemplate.syncSend(
String.format("%s:%s", TOPIC, tag),
qMessage
);
log.info("sendMessageWithTag result: [{}]", JSON.toJSONString(sendResult));
}
/**
* 使, key tag
*
* @param key
* @param tag
* @param value
*/
public void sendMessageWithAll(String key, String tag, String value) {
// 构建消息对象
Message<String> message = MessageBuilder
.withPayload(value)
.setHeader(RocketMQHeaders.KEYS, key)
.build();
// 同步发送
SendResult sendResult = rocketMQTemplate.syncSend(
String.format("%s:%s", TOPIC, tag),
message
);
log.info("sendMessageWithAll result: [{}]", JSON.toJSONString(sendResult));
}
}

@ -0,0 +1,21 @@
package com.baiye.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* kafka
*
* @author q
* @date 2022/07/23
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class QMessage {
private Integer id;
private String productName;
}

@ -0,0 +1,50 @@
server:
port: 8001
servlet:
context-path: /springboot-rocketmq-dev
# RocketMQ 的配置, 这是最低配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
# 发送同一类消息的设置为同一个 group, 保证唯一
group: study-ecommerce
#spring:
# # SpringBoot 集成 Kafka 的配置, 最低配置只需要配置 spring.kafka.bootstrap-servers
# kafka:
# bootstrap-servers: 127.0.0.1:9092
# jpa:
# show-sql: true
# hibernate:
# ddl-auto: none
# properties:
# hibernate.show_sql: true
# hibernate.format_sql: true
# open-in-view: false
# datasource:
# # 数据源
# url: jdbc:mysql://127.0.0.1:3306/q_springboot_kafka?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false
# username: root
# password: root
# type: com.zaxxer.hikari.HikariDataSource
# driver-class-name: com.mysql.cj.jdbc.Driver
# # 连接池
# hikari:
# maximum-pool-size: 8
# minimum-idle: 4
# idle-timeout: 30000
# connection-timeout: 30000
# max-lifetime: 45000
# auto-commit: true
# pool-name: StudyEcommerce
# consumer:
# 如果 Consumer 没有指定 group-id, 则使用配置文件中配置的; 如果配置文件中也没有定义, 则由框架随机生成
# group-id: imooc-study-ecommerce
# auto-offset-reset: latest
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# producer:
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer

@ -0,0 +1,16 @@
spring:
profiles:
# prod, dev
active: dev
application:
name: study-ecommerce
# 暴露端点
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
show-details: always

@ -0,0 +1,15 @@
### message-with-value
GET http://127.0.0.1:8001/springboot-rocketmq-dev/rocket-mq/message-with-value
Content-Type: application/json
### message-with-key
GET http://127.0.0.1:8001/springboot-rocketmq-dev/rocket-mq/message-with-key
Content-Type: application/json
### message-with-tag
GET http://127.0.0.1:8001/springboot-rocketmq-dev/rocket-mq/message-with-tag
Content-Type: application/json
### message-with-all
GET http://127.0.0.1:8001/springboot-rocketmq-dev/rocket-mq/message-with-all
Content-Type: application/json

@ -0,0 +1,7 @@
### wrong-rollback-for
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/transactional-lose/wrong-rollback-for
Content-Type: application/json
### wrong-inner-call
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/transactional-lose/wrong-inner-call
Content-Type: application/json

@ -35,6 +35,7 @@
<module>database/shardingsphere/dev-protocol-shardingsphere-base</module> <module>database/shardingsphere/dev-protocol-shardingsphere-base</module>
<module>mq/rocketmq/rocket-api</module> <module>mq/rocketmq/rocket-api</module>
<module>bigdata/kafka/springboot-kafka</module> <module>bigdata/kafka/springboot-kafka</module>
<module>mq/rocketmq/springboot-rocketmq</module>
</modules> </modules>
<properties> <properties>

Loading…
Cancel
Save