[新增功能](master): springCloud-stream

springCloud-stream的使用实践和相关整合
master
土豆兄弟 2 years ago
parent 105486b682
commit f24059fa3f

@ -422,7 +422,6 @@
<dependency> <dependency>
<groupId>javax.annotation</groupId> <groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId> <artifactId>javax.annotation-api</artifactId>
<version>${annotation-api.version}</version>
</dependency> </dependency>
</dependencies> </dependencies>
</profile> </profile>

@ -16,4 +16,33 @@
<maven.compiler.target>8</maven.compiler.target> <maven.compiler.target>8</maven.compiler.target>
</properties> </properties>
<dependencies>
<!-- 各种工具类, 建议其他模块直接进行依赖即可 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
</project> </project>

@ -0,0 +1,46 @@
### Example user template template
### Example user template
# IntelliJ project files
.idea
*.iml
out
gen
### Maven template
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
# https://github.com/takari/maven-wrapper#usage-without-binary-jar
.mvn/wrapper/maven-wrapper.jar
### Java template
# Compiled class file
*.class
# Log file
*.log
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

@ -0,0 +1,114 @@
## SpringCloud Stream
### 1. 为什么会出现 SpringCloud Stream
- 如果我们需要更换队列组件, 我们的代码就需要重写
- 解耦业务代码和中间件代码
### 2. SpringCloud Stream 中的核心概念
-Spring Messaging 模块
- SpringMessaging 是 Spring Framework 中的一个模块, 其作用就是统一消息的编程模型
- Messaging 对应的模型只包括 Payload 和 Header
```java
Producer --(send[Message])--> Message Channel --(receive)--> Consumer
```
- 负责与中间件交互的抽象绑定器: Binder
- 通过 Binder 组件实现与外部消息系统通信, 屏蔽了底层中间件的使用细节
- 消息分类映射为通信信道, 可以为不同类的消息自定义通信信道
- 发送消息与接收消息的应用通信信道: Input, Output
```java
Spring Cloud Stream Application
Application Core
| |\
| |
(inputs) (outputs)
| |
|/ |
Binder
|
Middleware
```
### 3. SpringCloud Stream 应用模型
- 经典的 SpringCloud Stream 发布-订阅模型
- 作者参考 Kafka 的模型设计的
```java
Message
|
Topic(Exchange)
| |
输入通道 输入通道
| |
订阅者-1 订阅者-2
```
- Topic 可以认为就是 Kafka 中的 Topic 概念
- Producer 通过 Input 信道发布消息到 Topic 上
- Consumer 通过 Output 信道消费 Topic 上的消息
### 4. SpringCloud Stream 消息分组和消费分区的配置与说明
```java
Message
|
Topic(Exchange)
—————————|—————————
| |
输入通道 输入通道
| |
Service-A Service-B
Service-A Service-B
(GroupA) (GroupB)
```
- SpringCloud Stream 消费分组
- 消费分组: 应用多实例部署, 提供服务吞吐量, 且不重复消费消息
- 不指定默认是每一个实例都会有个生成的消费组, 所以消息会被消费很多次
- 我们尽量指定消费者组, 让其消息只消费一次
- 忘记指定消费者组的情况, 会发生消息不一致的问题
- SpringCloud Stream 消费者组模型
- 应用的不同实例放在一个消费者组中, 每一条消息只会被一个实例消费
- 消费者组的思想是通过多实例扩展服务吞吐量, 且不会造成消息的重复消费
- 默认会对不指定消费者组, 都会进行发送消费, 除非你有这样的场景, 要不然建议是指定的
---
- 消费分区
- 消费分区的作用就是为了确保具有共同特征标识的数据由同一个消费者实例进行处理
- 我们要确保指定的消息会被指定的消费者组的分区进行消费, 这样可以保证数据的先后和一致性
- 通过自定义提取Key, 和转发策略来进行实现
- 这样做可以为不支持消费分区的消息中间件也实现了这种功能的扩展
````java
HTTP ---
| --> Partition1 --> Average Processor
HTTP ---
| --> Partition2 --> Average Processor
HTTP ---
````
### 5. TODO-list
- 研究 Bus 消息总线的集合使用-实现消息的统一发送
- 补充 stream 消息持久化
- 补充 stream 监控
- 补充 stream 异常处理
- Spring Cloud Stream + RocketMQ 实现分布式事务
- RabbitMQ 的整合

@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dev-protocol</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dev-protocol-springcloud-stream</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- 依赖父工程的 common 模块-->
<dependency>
<groupId>org.example</groupId>
<artifactId>dev-protocol-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 创建工程需要的两个依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- zipkin = spring-cloud-starter-sleuth + spring-cloud-sleuth-zipkin-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
<!-- SpringCloud Stream-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<!-- SpringCloud Stream + Kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<!-- SpringCloud Stream + RocketMQ -->
<!-- <dependency>-->
<!-- <groupId>com.alibaba.cloud</groupId>-->
<!-- <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>-->
<!-- </dependency>-->
</dependencies>
<!--
SpringBoot的Maven插件, 能够以Maven的方式为应用提供SpringBoot的支持可以将
SpringBoot应用打包为可执行的jar或war文件, 然后以通常的方式运行SpringBoot应用
-->
<build>
<finalName>${artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,21 @@
package com.baiye;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
/**
* SpringCloud Stream
*
* @author q
* @date 2022/07/24
*/
@EnableDiscoveryClient
@SpringBootApplication
public class DevProtocolSpringCloudStreamApplication {
public static void main(String[] args) {
SpringApplication.run(DevProtocolSpringCloudStreamApplication.class, args);
}
}

@ -0,0 +1,38 @@
package com.baiye.stream;
import com.alibaba.fastjson.JSON;
import com.baiye.vo.QMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
/**
* 使
*
* @author q
* @date 2022/07/24
*/
@Slf4j
@EnableBinding(
Sink.class
)
public class DefaultReceiveService {
/**
* 使
*
* @param payloud payloud
*/
@StreamListener(Sink.INPUT)
public void receiveMessage(Object payloud) {
log.info("in DefaultReceiveService consumer message start ... ");
// 进行消息的反序列化
// FIXME: 这里使用的是 fast-json
QMessage message = JSON.parseObject(payloud.toString(), QMessage.class);
// 消费消息
// FIXME: 这里使用的是 fast-json
log.info("in DefaultReceiveService consumer message success : [{}]", JSON.toJSONString(message));
}
}

@ -0,0 +1,46 @@
package com.baiye.stream;
import com.alibaba.fastjson.JSON;
import com.baiye.vo.QMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
/**
* 使
*
* @author q
* @date 2022/07/24
*/
@Slf4j
@EnableBinding(
Source.class
)
public class DefaultSendService {
/**
*
*/
public final Source source;
public DefaultSendService(Source source) {
this.source = source;
}
/**
* 使
*
* @param message
*/
public void sendMessage(QMessage message) {
// FIXME: 这里使用的是 fast-json
String msg = JSON.toJSONString(message);
log.info("in DefaultSendService send message: [{}]", msg);
// Spring Messaging, 统一消息编程模型, 是 Stream 组件的重要组成部分之一
source.output().send(MessageBuilder.withPayload(msg).build());
}
}

@ -0,0 +1,47 @@
package com.baiye.stream.controller;
import com.baiye.stream.DefaultSendService;
import com.baiye.stream.my.QSendService;
import com.baiye.vo.QMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
*
*
* @author q
* @date 2022/07/24
*/
@Slf4j
@RestController
@RequestMapping("/message")
public class MessageController {
private final DefaultSendService defaultSendService;
private final QSendService qSendService;
public MessageController(DefaultSendService defaultSendService, QSendService qSendService) {
this.defaultSendService = defaultSendService;
this.qSendService = qSendService;
}
/**
*
*/
@GetMapping("/default")
public void defaultSend() {
defaultSendService.sendMessage(QMessage.defaultMessage());
}
/**
*
*/
@GetMapping("/q")
public void qSend() {
qSendService.sendMessage(QMessage.defaultMessage());
}
}

@ -0,0 +1,34 @@
package com.baiye.stream.my;
import com.alibaba.fastjson.JSON;
import com.baiye.vo.QMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
/**
* 使
*
* @author q
* @date 2022/07/24
*/
@Slf4j
@EnableBinding(QSink.class)
public class QReceiveService {
/**
* 使
*
* @param payloud payloud
*/
@StreamListener(QSink.INPUT)
public void receiveMessage(@Payload Object payloud) {
log.info(" in QReceiveService consumer message start ... ");
// 消息反序列化
QMessage qMessage = JSON.parseObject(payloud.toString(), QMessage.class);
log.info("in v consumer message success: [{}]", JSON.toJSONString(qMessage));
}
}

@ -0,0 +1,39 @@
package com.baiye.stream.my;
import com.alibaba.fastjson.JSON;
import com.baiye.vo.QMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
/**
* 使 QSendService
*
* @author q
* @date 2022/07/24
*/
@Slf4j
@EnableBinding(QSource.class)
public class QSendService {
private final QSource source;
public QSendService(QSource source) {
this.source = source;
}
/**
* 使
*
* @param message
*/
public void sendMessage(QMessage message) {
// FIXME: 这里使用的是 fast-json
String msg = JSON.toJSONString(message);
log.info("in QSendService send message: [{}]", message);
source.qOutput().send(
MessageBuilder.withPayload(msg).build()
);
}
}

@ -0,0 +1,23 @@
package com.baiye.stream.my;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
*
*
* @author q
* @date 2022/07/24
*/
public interface QSink {
String INPUT = "Q-InPut";
/**
* Q-InPut, 使 Stream yml
*
* @return {@link SubscribableChannel}
*/
@Input(QSink.INPUT)
SubscribableChannel qTnput();
}

@ -0,0 +1,26 @@
package com.baiye.stream.my;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
*
*
* @author q
* @date 2022/07/24
*/
public interface QSource {
String OUTPUT = "Q-OutPut";
/**
* Q-OutPut, 使 Stream yml
*
* @return {@link MessageChannel}
*/
@Output(QSource.OUTPUT)
MessageChannel qOutput();
}

@ -0,0 +1,31 @@
package com.baiye.stream.partition;
import com.alibaba.fastjson.JSON;
import com.baiye.vo.QMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* Message partition key
*
* -> Selector
*
* @author q
* @date 2022/07/24
*/
@Slf4j
@Component
public class QPartitionKeyExtractorStrategy implements PartitionKeyExtractorStrategy {
@Override
public Object extractKey(Message<?> message) {
// 进行反序列化
QMessage qMessage = JSON.parseObject(message.getPayload().toString(), QMessage.class);
// 自定义提取 Key
// todo 常见的策略有 加盐|加时间戳|加前后缀
String key = qMessage.getProjectName();
log.info("SpringCloud Stream Q Partition Key: [{}]", key);
return key;
}
}

@ -0,0 +1,32 @@
package com.baiye.stream.partition;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.binder.PartitionSelectorStrategy;
import org.springframework.stereotype.Component;
/**
* message
*
* @author q
* @date 2022/07/24
*/
@Slf4j
@Component
public class QPartitionSelectorStrategy implements PartitionSelectorStrategy {
/**
*
*
* @param key - QPartitionKeyExtractorStrategy ProjectName
* @param partitionCount
* @return int
*/
@Override
public int selectPartition(Object key, int partitionCount) {
// todo 这里是一个示范 在这里用特定的业务处理来实现往哪个分区去发送
int partition = key.toString().hashCode() % partitionCount;
log.info("SpringCloud Stream Q Selector info: [{}], [{}], [{}]", key.toString(), partitionCount, partition);
return partition;
}
}

@ -0,0 +1,44 @@
package com.baiye.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* : SpringCloud Stream + Kafka/RocketMQ
*
* @author q
* @date 2022/07/24
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class QMessage {
private Integer id;
private String projectName;
private String org;
private String author;
private String version;
/**
* ,便使
*
* @return {@link QMessage}
*/
public static QMessage defaultMessage() {
return new QMessage(
1,
"springCloud-stream-client",
"baiye.com",
"q",
"1.0"
);
}
}

@ -0,0 +1,90 @@
server:
port: 8006
servlet:
context-path: /dev-protocol-springcloud-stream
spring:
application:
name: dev-protocol-springcloud-stream
cloud:
nacos:
# 服务注册发现
discovery:
enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可
server-addr: 127.0.0.1:8848
namespace: 1bc13fd5-843b-4ac0-aa55-695c25bc0ac6
metadata:
management:
context-path: ${server.servlet.context-path}/actuator
# 消息驱动的配置
stream:
# SpringCloud Stream + Kafka
kafka:
binder:
brokers: 127.0.0.1:9092
auto-create-topics: true # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好
# SpringCloud Stream + RocketMQ
# rocketmq:
# binder:
# name-server: 127.0.0.1:9876
# fixme 开启 stream 分区支持
instanceCount: 1 # 消费者的总数
instanceIndex: 0 # 当前消费者的索引
bindings:
# 默认发送方
output: # 这里用 Stream 给我们提供的默认 output 信道
destination: dev-protocol-springcloud-stream-client-default # 消息发往的目的地, Kafka 中就是 Topic
content-type: text/plain # 消息发送的格式, 接收端不用指定格式, 但是发送端要
# fixme 消息分区
producer:
# fixme partitionKeyExpression 这种方式需要我们需要对 author 字段进行各种处理, 这里不使用这个方式, 使用下面自定义的配置策略
# partitionKeyExpression: payload.author # 分区关键字, payload 指的是发送的对象, author 是对象中的属性, 不存在 author 可能会报错
partitionCount: 1 # 分区大小
# 使用自定义的分区策略, 注释掉 partitionKeyExpression 根据提取的Key 做一个 Selector 下面配置的是自定义Bean的名称,注意首字母要大写
partitionKeyExtractorName: qPartitionKeyExtractorStrategy
partitionSelectorName: qPartitionSelectorStrategy
# 默认接收方
input: # 这里用 Stream 给我们提供的默认 input 信道
destination: dev-protocol-springcloud-stream-client-default
# fixme 这里指定了group 方便其进行 分组消费
group: dev-protocol-springcloud-stream-client-group-default
# fixme 消费者开启 分区支持
consumer:
partitioned: true
# q 发送方 destination 对应
qOutput:
destination: dev-protocol-springcloud-stream-client-q
content-type: text/plain
# q 接收方
qInput:
destination: dev-protocol-springcloud-stream-client-q
# fixme 这里指定了group 方便其进行分组消费
group: dev-protocol-springcloud-stream-q-group
# spring-kafka 的配置
# kafka:
# bootstrap-servers: 127.0.0.1:9092
# producer:
# retries: 3
# consumer:
# auto-offset-reset: latest
# sleuth:
# sampler:
# # ProbabilityBasedSampler 抽样策略
# probability: 1.0 # 采样比例, 1.0 表示 100%, 默认是 0.1
# # RateLimitingSampler 抽样策略
# rate: 100 # 每秒间隔接受的 trace 量
# zipkin:
# sender:
# type: kafka # 默认是 http
# base-url: http://localhost:9411/
# 暴露端点
#management:
# endpoints:
# web:
# exposure:
# include: '*'
# endpoint:
# health:
# show-details: always

@ -0,0 +1,7 @@
### 发送默认信道消息
GET http://127.0.0.1:8006/dev-protocol-springcloud-stream/message/default
Content-Type: application/json
### 发送自定义信道消息
GET http://127.0.0.1:8006/dev-protocol-springcloud-stream/message/q
Content-Type: application/json

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dev-protocol</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dev-protocol-springcloud</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>

@ -36,6 +36,8 @@
<module>mq/rocketmq/rocket-api</module> <module>mq/rocketmq/rocket-api</module>
<module>bigdata/kafka/springboot-kafka</module> <module>bigdata/kafka/springboot-kafka</module>
<module>mq/rocketmq/springboot-rocketmq</module> <module>mq/rocketmq/springboot-rocketmq</module>
<module>dev-protocol-springcloud</module>
<module>dev-protocol-springcloud/dev-protocol-springcloud-stream</module>
</modules> </modules>
<properties> <properties>
@ -44,11 +46,19 @@
<spring.boot.version>2.3.2.RELEASE</spring.boot.version> <spring.boot.version>2.3.2.RELEASE</spring.boot.version>
<spring.cloud.version>Hoxton.SR10</spring.cloud.version> <spring.cloud.version>Hoxton.SR10</spring.cloud.version>
<spring-cloud-alibaba.version>2.2.7.RELEASE</spring-cloud-alibaba.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId> <artifactId>spring-boot-dependencies</artifactId>

Loading…
Cancel
Save