From f24059fa3f792a33884c695e50b0442659d6d148 Mon Sep 17 00:00:00 2001 From: qyx <565485304@qq.com> Date: Mon, 25 Jul 2022 00:01:30 +0800 Subject: [PATCH] =?UTF-8?q?[=E6=96=B0=E5=A2=9E=E5=8A=9F=E8=83=BD](master):?= =?UTF-8?q?=20springCloud-stream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit springCloud-stream的使用实践和相关整合 --- .../dev-protocol-shardingsphere-base/pom.xml | 1 - dev-protocol-common/pom.xml | 29 +++++ .../.gitignore | 46 +++++++ .../dev-protocol-springcloud-stream/README.md | 114 ++++++++++++++++++ .../dev-protocol-springcloud-stream/pom.xml | 80 ++++++++++++ ...vProtocolSpringCloudStreamApplication.java | 21 ++++ .../baiye/stream/DefaultReceiveService.java | 38 ++++++ .../com/baiye/stream/DefaultSendService.java | 46 +++++++ .../stream/controller/MessageController.java | 47 ++++++++ .../com/baiye/stream/my/QReceiveService.java | 34 ++++++ .../com/baiye/stream/my/QSendService.java | 39 ++++++ .../main/java/com/baiye/stream/my/QSink.java | 23 ++++ .../java/com/baiye/stream/my/QSource.java | 26 ++++ .../QPartitionKeyExtractorStrategy.java | 31 +++++ .../partition/QPartitionSelectorStrategy.java | 32 +++++ .../src/main/java/com/baiye/vo/QMessage.java | 44 +++++++ .../src/main/resources/bootstrap.yml | 90 ++++++++++++++ .../src/main/resources/http/message.http | 7 ++ dev-protocol-springcloud/pom.xml | 22 ++++ pom.xml | 10 ++ 20 files changed, 779 insertions(+), 1 deletion(-) create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/.gitignore create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/README.md create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/pom.xml create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/DevProtocolSpringCloudStreamApplication.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/DefaultReceiveService.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/DefaultSendService.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/controller/MessageController.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QReceiveService.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QSendService.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QSink.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QSource.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/partition/QPartitionKeyExtractorStrategy.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/partition/QPartitionSelectorStrategy.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/vo/QMessage.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/bootstrap.yml create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/http/message.http create mode 100644 dev-protocol-springcloud/pom.xml diff --git a/database/shardingsphere/dev-protocol-shardingsphere-base/pom.xml b/database/shardingsphere/dev-protocol-shardingsphere-base/pom.xml index 9a89db4..035a02c 100644 --- a/database/shardingsphere/dev-protocol-shardingsphere-base/pom.xml +++ b/database/shardingsphere/dev-protocol-shardingsphere-base/pom.xml @@ -422,7 +422,6 @@ javax.annotation javax.annotation-api - ${annotation-api.version} diff --git a/dev-protocol-common/pom.xml b/dev-protocol-common/pom.xml index fccf8d9..ee4c3bb 100644 --- a/dev-protocol-common/pom.xml +++ b/dev-protocol-common/pom.xml @@ -16,4 +16,33 @@ 8 + + + + org.projectlombok + lombok + 1.16.18 + + + org.apache.commons + commons-lang3 + 3.11 + + + org.apache.commons + commons-collections4 + 4.4 + + + cn.hutool + hutool-all + 5.6.0 + + + com.alibaba + fastjson + 1.2.47 + + + \ No newline at end of file diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/.gitignore b/dev-protocol-springcloud/dev-protocol-springcloud-stream/.gitignore new file mode 100644 index 0000000..d4beec9 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/.gitignore @@ -0,0 +1,46 @@ +### Example user template template +### Example user template + +# IntelliJ project files +.idea +*.iml +out +gen +### Maven template +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +# https://github.com/takari/maven-wrapper#usage-without-binary-jar +.mvn/wrapper/maven-wrapper.jar + +### Java template +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/README.md b/dev-protocol-springcloud/dev-protocol-springcloud-stream/README.md new file mode 100644 index 0000000..f000d63 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/README.md @@ -0,0 +1,114 @@ +## SpringCloud Stream + +### 1. 为什么会出现 SpringCloud Stream +- 如果我们需要更换队列组件, 我们的代码就需要重写 +- 解耦业务代码和中间件代码 + +### 2. SpringCloud Stream 中的核心概念 +-Spring Messaging 模块 + - SpringMessaging 是 Spring Framework 中的一个模块, 其作用就是统一消息的编程模型 + - Messaging 对应的模型只包括 Payload 和 Header + +```java + + Producer --(send[Message])--> Message Channel --(receive)--> Consumer + +``` + +- 负责与中间件交互的抽象绑定器: Binder + - 通过 Binder 组件实现与外部消息系统通信, 屏蔽了底层中间件的使用细节 + - 消息分类映射为通信信道, 可以为不同类的消息自定义通信信道 +- 发送消息与接收消息的应用通信信道: Input, Output + +```java + Spring Cloud Stream Application + + Application Core + | |\ + | | + (inputs) (outputs) + | | + |/ | + Binder + | + Middleware +``` + +### 3. SpringCloud Stream 应用模型 +- 经典的 SpringCloud Stream 发布-订阅模型 +- 作者参考 Kafka 的模型设计的 + +```java + Message + | + Topic(Exchange) + | | +输入通道 输入通道 + | | +订阅者-1 订阅者-2 + +``` +- Topic 可以认为就是 Kafka 中的 Topic 概念 +- Producer 通过 Input 信道发布消息到 Topic 上 +- Consumer 通过 Output 信道消费 Topic 上的消息 + +### 4. SpringCloud Stream 消息分组和消费分区的配置与说明 + +```java + Message + | + Topic(Exchange) + —————————|————————— + | | +输入通道 输入通道 + | | +Service-A Service-B +Service-A Service-B + (GroupA) (GroupB) +``` +- SpringCloud Stream 消费分组 + - 消费分组: 应用多实例部署, 提供服务吞吐量, 且不重复消费消息 + - 不指定默认是每一个实例都会有个生成的消费组, 所以消息会被消费很多次 + - 我们尽量指定消费者组, 让其消息只消费一次 + - 忘记指定消费者组的情况, 会发生消息不一致的问题 +- SpringCloud Stream 消费者组模型 + - 应用的不同实例放在一个消费者组中, 每一条消息只会被一个实例消费 + - 消费者组的思想是通过多实例扩展服务吞吐量, 且不会造成消息的重复消费 + - 默认会对不指定消费者组, 都会进行发送消费, 除非你有这样的场景, 要不然建议是指定的 +--- + +- 消费分区 + - 消费分区的作用就是为了确保具有共同特征标识的数据由同一个消费者实例进行处理 + - 我们要确保指定的消息会被指定的消费者组的分区进行消费, 这样可以保证数据的先后和一致性 + - 通过自定义提取Key, 和转发策略来进行实现 + - 这样做可以为不支持消费分区的消息中间件也实现了这种功能的扩展 + +````java + HTTP --- + | --> Partition1 --> Average Processor + HTTP --- + | --> Partition2 --> Average Processor + HTTP --- +```` + +### 5. TODO-list +- 研究 Bus 消息总线的集合使用-实现消息的统一发送 +- 补充 stream 消息持久化 +- 补充 stream 监控 +- 补充 stream 异常处理 +- Spring Cloud Stream + RocketMQ 实现分布式事务 +- RabbitMQ 的整合 + + + + + + + + + + + + + + diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/pom.xml b/dev-protocol-springcloud/dev-protocol-springcloud-stream/pom.xml new file mode 100644 index 0000000..d171066 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/pom.xml @@ -0,0 +1,80 @@ + + + + dev-protocol + org.example + 1.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + dev-protocol-springcloud-stream + + + 8 + 8 + + + + + + org.example + dev-protocol-common + 1.0-SNAPSHOT + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + org.springframework.cloud + spring-cloud-starter-zipkin + + + org.springframework.kafka + spring-kafka + 2.5.0.RELEASE + + + + org.springframework.cloud + spring-cloud-stream + + + + org.springframework.cloud + spring-cloud-stream-binder-kafka + + + + + + + + + + + ${artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + + + \ No newline at end of file diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/DevProtocolSpringCloudStreamApplication.java b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/DevProtocolSpringCloudStreamApplication.java new file mode 100644 index 0000000..fbb1e50 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/DevProtocolSpringCloudStreamApplication.java @@ -0,0 +1,21 @@ +package com.baiye; + + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; + +/** + * 基于 SpringCloud Stream 构建消息驱动微服务 + * + * @author q + * @date 2022/07/24 + */ +@EnableDiscoveryClient +@SpringBootApplication +public class DevProtocolSpringCloudStreamApplication { + public static void main(String[] args) { + SpringApplication.run(DevProtocolSpringCloudStreamApplication.class, args); + } + +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/DefaultReceiveService.java b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/DefaultReceiveService.java new file mode 100644 index 0000000..b696a91 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/DefaultReceiveService.java @@ -0,0 +1,38 @@ +package com.baiye.stream; + +import com.alibaba.fastjson.JSON; +import com.baiye.vo.QMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Sink; + +/** + * 使用默认的信道实现消息的接收 + * + * @author q + * @date 2022/07/24 + */ +@Slf4j +@EnableBinding( + Sink.class +) +public class DefaultReceiveService { + + /** + * 使用默认的输入信道接收消息 + * + * @param payloud payloud + */ + @StreamListener(Sink.INPUT) + public void receiveMessage(Object payloud) { + + log.info("in DefaultReceiveService consumer message start ... "); + // 进行消息的反序列化 + // FIXME: 这里使用的是 fast-json + QMessage message = JSON.parseObject(payloud.toString(), QMessage.class); + // 消费消息 + // FIXME: 这里使用的是 fast-json + log.info("in DefaultReceiveService consumer message success : [{}]", JSON.toJSONString(message)); + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/DefaultSendService.java b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/DefaultSendService.java new file mode 100644 index 0000000..1709607 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/DefaultSendService.java @@ -0,0 +1,46 @@ +package com.baiye.stream; + +import com.alibaba.fastjson.JSON; +import com.baiye.vo.QMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.messaging.support.MessageBuilder; + +/** + * 使用默认的消息信道进行发送消息 + * + * @author q + * @date 2022/07/24 + */ +@Slf4j +@EnableBinding( + Source.class +) +public class DefaultSendService { + + /** + * 注入通信信道源 + */ + public final Source source; + + public DefaultSendService(Source source) { + this.source = source; + } + + + /** + * 使用默认的输出信道发送消息 + * + * @param message 消息 + */ + public void sendMessage(QMessage message) { + // FIXME: 这里使用的是 fast-json + String msg = JSON.toJSONString(message); + log.info("in DefaultSendService send message: [{}]", msg); + + // Spring Messaging, 统一消息编程模型, 是 Stream 组件的重要组成部分之一 + source.output().send(MessageBuilder.withPayload(msg).build()); + } + +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/controller/MessageController.java b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/controller/MessageController.java new file mode 100644 index 0000000..6a83223 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/controller/MessageController.java @@ -0,0 +1,47 @@ +package com.baiye.stream.controller; + +import com.baiye.stream.DefaultSendService; +import com.baiye.stream.my.QSendService; +import com.baiye.vo.QMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * 构建消息驱动 + * + * @author q + * @date 2022/07/24 + */ +@Slf4j +@RestController +@RequestMapping("/message") +public class MessageController { + + private final DefaultSendService defaultSendService; + + private final QSendService qSendService; + + public MessageController(DefaultSendService defaultSendService, QSendService qSendService) { + this.defaultSendService = defaultSendService; + this.qSendService = qSendService; + } + + /** + * 默认信道 + */ + @GetMapping("/default") + public void defaultSend() { + defaultSendService.sendMessage(QMessage.defaultMessage()); + } + + /** + * 自定义信道发送 + */ + @GetMapping("/q") + public void qSend() { + qSendService.sendMessage(QMessage.defaultMessage()); + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QReceiveService.java b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QReceiveService.java new file mode 100644 index 0000000..a5ca118 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QReceiveService.java @@ -0,0 +1,34 @@ +package com.baiye.stream.my; + +import com.alibaba.fastjson.JSON; +import com.baiye.vo.QMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.handler.annotation.Payload; + +/** + * 使用自定义的输入信道实现消息的接收 + * + * @author q + * @date 2022/07/24 + */ +@Slf4j +@EnableBinding(QSink.class) +public class QReceiveService { + + /** + * 使用自定义的输入信道接收消息 + * + * @param payloud payloud + */ + @StreamListener(QSink.INPUT) + public void receiveMessage(@Payload Object payloud) { + + log.info(" in QReceiveService consumer message start ... "); + // 消息反序列化 + QMessage qMessage = JSON.parseObject(payloud.toString(), QMessage.class); + log.info("in v consumer message success: [{}]", JSON.toJSONString(qMessage)); + + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QSendService.java b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QSendService.java new file mode 100644 index 0000000..8e7e5f2 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QSendService.java @@ -0,0 +1,39 @@ +package com.baiye.stream.my; + +import com.alibaba.fastjson.JSON; +import com.baiye.vo.QMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.messaging.support.MessageBuilder; + +/** + * 使用自定义的通信信道 QSendService 实现消息的发送 + * + * @author q + * @date 2022/07/24 + */ +@Slf4j +@EnableBinding(QSource.class) +public class QSendService { + + private final QSource source; + + public QSendService(QSource source) { + this.source = source; + } + + /** + * 使用自定义的输出信道发送消息 + * + * @param message 消息 + */ + public void sendMessage(QMessage message) { + // FIXME: 这里使用的是 fast-json + String msg = JSON.toJSONString(message); + log.info("in QSendService send message: [{}]", message); + source.qOutput().send( + MessageBuilder.withPayload(msg).build() + ); + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QSink.java b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QSink.java new file mode 100644 index 0000000..c40fd87 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QSink.java @@ -0,0 +1,23 @@ +package com.baiye.stream.my; + +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.messaging.SubscribableChannel; + +/** + * 自定义输入信道 + * + * @author q + * @date 2022/07/24 + */ +public interface QSink { + + String INPUT = "Q-InPut"; + + /** + * 输入信道的名称是 Q-InPut, 需要使用 Stream 绑定器在 yml 文件中配置 + * + * @return {@link SubscribableChannel} + */ + @Input(QSink.INPUT) + SubscribableChannel qTnput(); +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QSource.java b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QSource.java new file mode 100644 index 0000000..e995346 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/my/QSource.java @@ -0,0 +1,26 @@ +package com.baiye.stream.my; + +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.messaging.MessageChannel; + +/** + * 自定义输出信道 + * + * @author q + * @date 2022/07/24 + */ +public interface QSource { + + String OUTPUT = "Q-OutPut"; + + + /** + * 输出信道的名称是 Q-OutPut, 需要使用 Stream 绑定器在 yml 文件中声明 + * + * @return {@link MessageChannel} + */ + @Output(QSource.OUTPUT) + MessageChannel qOutput(); + + +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/partition/QPartitionKeyExtractorStrategy.java b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/partition/QPartitionKeyExtractorStrategy.java new file mode 100644 index 0000000..e90b44f --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/partition/QPartitionKeyExtractorStrategy.java @@ -0,0 +1,31 @@ +package com.baiye.stream.partition; + +import com.alibaba.fastjson.JSON; +import com.baiye.vo.QMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +/** + * 自定义从 Message 中提取 partition key 的策略 + * + * 先提取 -> 再给Selector 去处理 + * + * @author q + * @date 2022/07/24 + */ +@Slf4j +@Component +public class QPartitionKeyExtractorStrategy implements PartitionKeyExtractorStrategy { + @Override + public Object extractKey(Message message) { + // 进行反序列化 + QMessage qMessage = JSON.parseObject(message.getPayload().toString(), QMessage.class); + // 自定义提取 Key + // todo 常见的策略有 加盐|加时间戳|加前后缀 + String key = qMessage.getProjectName(); + log.info("SpringCloud Stream Q Partition Key: [{}]", key); + return key; + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/partition/QPartitionSelectorStrategy.java b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/partition/QPartitionSelectorStrategy.java new file mode 100644 index 0000000..425b74a --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/stream/partition/QPartitionSelectorStrategy.java @@ -0,0 +1,32 @@ +package com.baiye.stream.partition; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.cloud.stream.binder.PartitionSelectorStrategy; +import org.springframework.stereotype.Component; + +/** + * 决定 message 发送到哪个分区的策略 + * + * @author q + * @date 2022/07/24 + */ +@Slf4j +@Component +public class QPartitionSelectorStrategy implements PartitionSelectorStrategy { + /** + * 选择分区的策略 + * + * @param key 选择键 - 对应 QPartitionKeyExtractorStrategy 中是 ProjectName + * @param partitionCount 分区数 + * @return int + */ + @Override + public int selectPartition(Object key, int partitionCount) { + // todo 这里是一个示范 在这里用特定的业务处理来实现往哪个分区去发送 + int partition = key.toString().hashCode() % partitionCount; + + log.info("SpringCloud Stream Q Selector info: [{}], [{}], [{}]", key.toString(), partitionCount, partition); + + return partition; + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/vo/QMessage.java b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/vo/QMessage.java new file mode 100644 index 0000000..c662c65 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/java/com/baiye/vo/QMessage.java @@ -0,0 +1,44 @@ +package com.baiye.vo; + + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 消息传递对象: SpringCloud Stream + Kafka/RocketMQ + * + * @author q + * @date 2022/07/24 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class QMessage { + + private Integer id; + + private String projectName; + + private String org; + + private String author; + + private String version; + + + /** + * 返回一个默认的消息,方便使用 + * + * @return {@link QMessage} + */ + public static QMessage defaultMessage() { + return new QMessage( + 1, + "springCloud-stream-client", + "baiye.com", + "q", + "1.0" + ); + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/bootstrap.yml b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/bootstrap.yml new file mode 100644 index 0000000..775c208 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/bootstrap.yml @@ -0,0 +1,90 @@ +server: + port: 8006 + servlet: + context-path: /dev-protocol-springcloud-stream + +spring: + application: + name: dev-protocol-springcloud-stream + cloud: + nacos: + # 服务注册发现 + discovery: + enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可 + server-addr: 127.0.0.1:8848 + namespace: 1bc13fd5-843b-4ac0-aa55-695c25bc0ac6 + metadata: + management: + context-path: ${server.servlet.context-path}/actuator + # 消息驱动的配置 + stream: + # SpringCloud Stream + Kafka + kafka: + binder: + brokers: 127.0.0.1:9092 + auto-create-topics: true # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好 + # SpringCloud Stream + RocketMQ + # rocketmq: + # binder: + # name-server: 127.0.0.1:9876 + # fixme 开启 stream 分区支持 + instanceCount: 1 # 消费者的总数 + instanceIndex: 0 # 当前消费者的索引 + bindings: + # 默认发送方 + output: # 这里用 Stream 给我们提供的默认 output 信道 + destination: dev-protocol-springcloud-stream-client-default # 消息发往的目的地, Kafka 中就是 Topic + content-type: text/plain # 消息发送的格式, 接收端不用指定格式, 但是发送端要 + # fixme 消息分区 + producer: + # fixme partitionKeyExpression 这种方式需要我们需要对 author 字段进行各种处理, 这里不使用这个方式, 使用下面自定义的配置策略 + # partitionKeyExpression: payload.author # 分区关键字, payload 指的是发送的对象, author 是对象中的属性, 不存在 author 可能会报错 + partitionCount: 1 # 分区大小 + # 使用自定义的分区策略, 注释掉 partitionKeyExpression 根据提取的Key 做一个 Selector 下面配置的是自定义Bean的名称,注意首字母要大写 + partitionKeyExtractorName: qPartitionKeyExtractorStrategy + partitionSelectorName: qPartitionSelectorStrategy + # 默认接收方 + input: # 这里用 Stream 给我们提供的默认 input 信道 + destination: dev-protocol-springcloud-stream-client-default + # fixme 这里指定了group 方便其进行 分组消费 + group: dev-protocol-springcloud-stream-client-group-default + # fixme 消费者开启 分区支持 + consumer: + partitioned: true + # q 发送方 destination 对应 + qOutput: + destination: dev-protocol-springcloud-stream-client-q + content-type: text/plain + # q 接收方 + qInput: + destination: dev-protocol-springcloud-stream-client-q + # fixme 这里指定了group 方便其进行分组消费 + group: dev-protocol-springcloud-stream-q-group + + # spring-kafka 的配置 +# kafka: +# bootstrap-servers: 127.0.0.1:9092 +# producer: +# retries: 3 +# consumer: +# auto-offset-reset: latest +# sleuth: +# sampler: +# # ProbabilityBasedSampler 抽样策略 +# probability: 1.0 # 采样比例, 1.0 表示 100%, 默认是 0.1 +# # RateLimitingSampler 抽样策略 +# rate: 100 # 每秒间隔接受的 trace 量 +# zipkin: +# sender: +# type: kafka # 默认是 http +# base-url: http://localhost:9411/ + +# 暴露端点 +#management: +# endpoints: +# web: +# exposure: +# include: '*' +# endpoint: +# health: +# show-details: always diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/http/message.http b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/http/message.http new file mode 100644 index 0000000..8e1a6c7 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/http/message.http @@ -0,0 +1,7 @@ +### 发送默认信道消息 +GET http://127.0.0.1:8006/dev-protocol-springcloud-stream/message/default +Content-Type: application/json + +### 发送自定义信道消息 +GET http://127.0.0.1:8006/dev-protocol-springcloud-stream/message/q +Content-Type: application/json diff --git a/dev-protocol-springcloud/pom.xml b/dev-protocol-springcloud/pom.xml new file mode 100644 index 0000000..bccc41f --- /dev/null +++ b/dev-protocol-springcloud/pom.xml @@ -0,0 +1,22 @@ + + + + dev-protocol + org.example + 1.0-SNAPSHOT + + 4.0.0 + + dev-protocol-springcloud + 1.0-SNAPSHOT + + pom + + + 8 + 8 + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 1874cb2..66cba3b 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,8 @@ mq/rocketmq/rocket-api bigdata/kafka/springboot-kafka mq/rocketmq/springboot-rocketmq + dev-protocol-springcloud + dev-protocol-springcloud/dev-protocol-springcloud-stream @@ -44,11 +46,19 @@ 2.3.2.RELEASE Hoxton.SR10 + 2.2.7.RELEASE + + com.alibaba.cloud + spring-cloud-alibaba-dependencies + ${spring-cloud-alibaba.version} + pom + import + org.springframework.boot spring-boot-dependencies