diff --git a/dev-protocol-springcloud/SpringCloud项目介绍.md b/dev-protocol-springcloud/SpringCloud项目介绍.md index d1075b3..e4af839 100644 --- a/dev-protocol-springcloud/SpringCloud项目介绍.md +++ b/dev-protocol-springcloud/SpringCloud项目介绍.md @@ -11,6 +11,7 @@ - [dev-protocol-springcloud-sleuth-zipkin](dev-protocol-springcloud-sleuth-zipkin) - 微服务容错 - (SpringCloud Netflix Hystrix) - [dev-protocol-springcloud-hystrix](dev-protocol-springcloud-hystrix) + - [dev-protocol-springcloud-hystrix-dashboard](dev-protocol-springcloud-hystrix-dashboard) - 消息驱动微服务 - (SpringCloud Stream) - todo - 分布式事务 - (SpringCloud Alibaba Seata) diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard/README.md b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard/README.md new file mode 100644 index 0000000..e63b277 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard/README.md @@ -0,0 +1,3 @@ +## hystrix-dashboard + +- hystrix 监控面板 \ No newline at end of file diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard/pom.xml b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard/pom.xml new file mode 100644 index 0000000..8f02a97 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard/pom.xml @@ -0,0 +1,69 @@ + + + 4.0.0 + + org.example + dev-protocol + 1.0-SNAPSHOT + ../../pom.xml + + + dev-protocol-springcloud-hystrix-dashboard + 1.0-SNAPSHOT + jar + + + dev-protocol-springcloud-hystrix-dashboard + Hystrix Dashboard + + + + 8 + 8 + UTF-8 + + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.cloud + spring-cloud-starter-netflix-hystrix + + + org.springframework.cloud + spring-cloud-starter-netflix-hystrix-dashboard + + + + + + ${artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + + + \ No newline at end of file diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard/src/main/java/org/example/HystrixDashboardApplication.java b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard/src/main/java/org/example/HystrixDashboardApplication.java new file mode 100644 index 0000000..ce530dc --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard/src/main/java/org/example/HystrixDashboardApplication.java @@ -0,0 +1,21 @@ +package org.example; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +import org.springframework.cloud.netflix.hystrix.dashboard.EnableHystrixDashboard; + +/** + *

hystrix dashboard 入口

+ * 控制台页面 http://127.0.0.1:9999/dev-protocol-springcloud-hystrix-dashboard/hystrix/ + * 需要监控的服务 stream 拼接: http://127.0.0.1:8111/dev-protocol-springcloud-hystrix/actuator/hystrix.stream + * */ +@EnableDiscoveryClient +@SpringBootApplication +@EnableHystrixDashboard // 开启 Hystrix Dashboard +public class HystrixDashboardApplication { + public static void main(String[] args) { + + SpringApplication.run(HystrixDashboardApplication.class, args); + } +} \ No newline at end of file diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard/src/main/resources/bootstrap.yml b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard/src/main/resources/bootstrap.yml new file mode 100644 index 0000000..976837d --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard/src/main/resources/bootstrap.yml @@ -0,0 +1,33 @@ +server: + port: 9999 + servlet: + context-path: /dev-protocol-springcloud-hystrix-dashboard + +spring: + application: + name: dev-protocol-springcloud-hystrix-dashboard + cloud: + nacos: + # 服务注册发现 + discovery: + enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可 + server-addr: 127.0.0.1:8848 + # server-addr: 127.0.0.1:8848,127.0.0.1:8849,127.0.0.1:8850 # Nacos 服务器地址 + namespace: 1ccc74ae-9398-4dbe-b9d7-4f9addf9f40c + metadata: + management: + context-path: ${server.servlet.context-path}/actuator + +hystrix: + dashboard: + proxy-stream-allow-list: "127.0.0.1" + +# 暴露端点 +management: + endpoints: + web: + exposure: + include: '*' + endpoint: + health: + show-details: always diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard/src/main/resources/http/dashboard.http b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard/src/main/resources/http/dashboard.http new file mode 100644 index 0000000..2395cb6 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard/src/main/resources/http/dashboard.http @@ -0,0 +1,31 @@ +### 获取 Token +POST http://127.0.0.1:9001/dev-protocol-springcloud-gateway/dev-protocol-springcloud-hystrix/communication/token-by-feign +Content-Type: application/json +e-commerce-user: eyJhbGciOiJSUzI1NiJ9.eyJlLWNvbW1lcmNlLXVzZXIiOiJ7XCJpZFwiOjEwLFwidXNlcm5hbWVcIjpcIlFpbnlpQGltb29jLmNvbVwifSIsImp0aSI6IjJlMTgwYzA2LWIwZGQtNDJjYy1iNzk5LTZiNGI4N2Q3NTM3NCIsImV4cCI6MTYyNTQxNDQwMH0.P8gKWcwiibEmq80idCl9QzUaqJnQ7FVQ5yxzlMLfRNcG3nWMYh0o1kBO2vKvDGG8BofQ4__xn210R2P7zjH-uEn7hic0jbBt97vLkzvATnFegxaDSjUp9At9BIGl9Ecu7RPCwSQJFdw6NBMBz02Uom8yvHpUJEfv8_jZRurBp94tzES6IAACKC6NFq-3_mQfT4o_zA51U4hBbIBw6qNXvMSqK-VtaqkTNFfJsP_8Y0nmQck0jxmgdtLfRVfZCCp3rhcLNDBX37THQI6U7lvNFETceAKyJ5NFaA8p68lw_3D2IiZER1jEU3M2QIMQGeU0pmQ6IPwGDdA0MP5IKKzj8A +token: imooc + +{ + "username": "Qinyi@imooc.com", + "password": "25d55ad283aa400af464c76d713c07ad" +} + + +### 根据提供的 serviceId 获取实例信息 +GET http://127.0.0.1:9001/dev-protocol-springcloud-gateway/ecommerce-nacos-client/hystrix/hystrix-command-annotation?serviceId=e-commerce-nacos-client +Content-Type: application/json +e-commerce-user: eyJhbGciOiJSUzI1NiJ9.eyJlLWNvbW1lcmNlLXVzZXIiOiJ7XCJpZFwiOjEwLFwidXNlcm5hbWVcIjpcIlFpbnlpQGltb29jLmNvbVwifSIsImp0aSI6IjJlMTgwYzA2LWIwZGQtNDJjYy1iNzk5LTZiNGI4N2Q3NTM3NCIsImV4cCI6MTYyNTQxNDQwMH0.P8gKWcwiibEmq80idCl9QzUaqJnQ7FVQ5yxzlMLfRNcG3nWMYh0o1kBO2vKvDGG8BofQ4__xn210R2P7zjH-uEn7hic0jbBt97vLkzvATnFegxaDSjUp9At9BIGl9Ecu7RPCwSQJFdw6NBMBz02Uom8yvHpUJEfv8_jZRurBp94tzES6IAACKC6NFq-3_mQfT4o_zA51U4hBbIBw6qNXvMSqK-VtaqkTNFfJsP_8Y0nmQck0jxmgdtLfRVfZCCp3rhcLNDBX37THQI6U7lvNFETceAKyJ5NFaA8p68lw_3D2IiZER1jEU3M2QIMQGeU0pmQ6IPwGDdA0MP5IKKzj8A +token: imooc + + +### 根据提供的 serviceId 获取实例信息 +GET http://127.0.0.1:9001/dev-protocol-springcloud-gateway/ecommerce-nacos-client/hystrix/simple-hystrix-command?serviceId=e-commerce-nacos-client +Content-Type: application/json +e-commerce-user: eyJhbGciOiJSUzI1NiJ9.eyJlLWNvbW1lcmNlLXVzZXIiOiJ7XCJpZFwiOjEwLFwidXNlcm5hbWVcIjpcIlFpbnlpQGltb29jLmNvbVwifSIsImp0aSI6IjJlMTgwYzA2LWIwZGQtNDJjYy1iNzk5LTZiNGI4N2Q3NTM3NCIsImV4cCI6MTYyNTQxNDQwMH0.P8gKWcwiibEmq80idCl9QzUaqJnQ7FVQ5yxzlMLfRNcG3nWMYh0o1kBO2vKvDGG8BofQ4__xn210R2P7zjH-uEn7hic0jbBt97vLkzvATnFegxaDSjUp9At9BIGl9Ecu7RPCwSQJFdw6NBMBz02Uom8yvHpUJEfv8_jZRurBp94tzES6IAACKC6NFq-3_mQfT4o_zA51U4hBbIBw6qNXvMSqK-VtaqkTNFfJsP_8Y0nmQck0jxmgdtLfRVfZCCp3rhcLNDBX37THQI6U7lvNFETceAKyJ5NFaA8p68lw_3D2IiZER1jEU3M2QIMQGeU0pmQ6IPwGDdA0MP5IKKzj8A +token: imooc + + +### 根据提供的 serviceId 获取实例信息 +GET http://127.0.0.1:9001/dev-protocol-springcloud-gateway/ecommerce-nacos-client/hystrix/hystrix-observable-command?serviceId=e-commerce-nacos-client +Content-Type: application/json +e-commerce-user: eyJhbGciOiJSUzI1NiJ9.eyJlLWNvbW1lcmNlLXVzZXIiOiJ7XCJpZFwiOjEwLFwidXNlcm5hbWVcIjpcIlFpbnlpQGltb29jLmNvbVwifSIsImp0aSI6IjJlMTgwYzA2LWIwZGQtNDJjYy1iNzk5LTZiNGI4N2Q3NTM3NCIsImV4cCI6MTYyNTQxNDQwMH0.P8gKWcwiibEmq80idCl9QzUaqJnQ7FVQ5yxzlMLfRNcG3nWMYh0o1kBO2vKvDGG8BofQ4__xn210R2P7zjH-uEn7hic0jbBt97vLkzvATnFegxaDSjUp9At9BIGl9Ecu7RPCwSQJFdw6NBMBz02Uom8yvHpUJEfv8_jZRurBp94tzES6IAACKC6NFq-3_mQfT4o_zA51U4hBbIBw6qNXvMSqK-VtaqkTNFfJsP_8Y0nmQck0jxmgdtLfRVfZCCp3rhcLNDBX37THQI6U7lvNFETceAKyJ5NFaA8p68lw_3D2IiZER1jEU3M2QIMQGeU0pmQ6IPwGDdA0MP5IKKzj8A +token: imooc diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-hystrix/NetflixHystrix.md b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix/NetflixHystrix.md index 3146cbb..2192c05 100644 --- a/dev-protocol-springcloud/dev-protocol-springcloud-hystrix/NetflixHystrix.md +++ b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix/NetflixHystrix.md @@ -21,25 +21,73 @@ - 请求依赖项失败后,可以选择出错或者是兜底回退 ### 使用注解方式实现服务的容错、降级 - +--- +- SpringCloud Netflix Hystrix 的三种模式 + - Hystrix 的断路器模式和后备策略模式 + - 断路器模式:设置超时或者失败等熔断策略 + - 后备策略模式:断路器模式触发之后,如果存在后备策略,则执行后备 + - Hystrix 的舱壁模式:Hystrix 舱壁模式的启发:货船为了进行防止漏水和火灾的扩散,会将货仓分隔为多个当发生灾害时,将所在货仓进行隔离就可以降低整艘船的风险 + - 舱壁模式功能就是:我们可以通过为各个服务分别指定线程池 + - 每一个远程资源调用都被放置在自己的线程池中,每一个线程池只有一定数量的线程来处理远程调用 + - 一个性能低下的服务,只会影响到同一个线程池中的其他服务调用。所以降低了会产生的负面影响 ### 使用编程方式实现服务的容错、降级 - [NacosClientHystrixCommand.java] - [NacosClientHystrixObservableCommand.java] ### 编程方式开启 Hystrix 请求缓存 - 先编写过滤器 [HystrixRequestContextServletFilter.java] +- Hystrix 的结果缓存指的是在一次 Hystrix 的请求上下文中 +- Hystrix 请求缓存的两种实现方式 + - 编程(继承 HystrixCommand)方式,重写 getCacheKey 方法即可 + - 注解方式 + - @CacheResult:该注解用来标记请求命令返回的结果应该被缓存,它必须与@HystrixCommand注解结合使用, 属性: cacheKeyMethod + - @CacheRemove: 该注解用来让请求命令的缓存失效,失效的缓存根据commandKey进行查找。属性: commandKey,cacheKeyMethod + - @CacheKey: 该注解用来在请求命令的参数上标记,使其作为cacheKey,如果没有使用此注解则会使用所有参数列表中的参数作为cacheKey, 属性: value ### 注解方式开启 Hystrix 请求缓存 - - +- ### 编程方式应用 Hystrix 请求合并 - - +- 默认情况下,每一个请求都会占用一个线程和一次网络请求,高并发场景下效率不高\ +- 使用 Hystrix 的请求合并,将多个请求 merge 为一个,提高服务的并发能力 +--- +- 请求合并的适用场景与注意事项 + - 适用场景:单个对象的查询并发数很高,服务提供方负载较高,就可以考虑使用请求合并 + - 注意事项 + - 请求在代码中人为的设置了延迟时间,会降低请求的响应速度 + - 可能会提高服务提供方的负载,因为返回 List 结果数据量偏大 + - 实现请求合并比较复杂 ### 注解方式应用 Hystrix 请求合并 - +- 使用注解的方式比较多 ### OpenFeign 集成 Hystrix 开启后备模式 - - +- OpenFeign 集成 Hystrix 的步骤 + - 在配置文件中开启 Hystrix 的熔断功能:feign.hystrix.enabled: true + - @FeignClient 注解的 fallback 和 fallbackFactory 属性 + - fallback 和 fallbackFactory 都是用于配置响应回退,但是不可以同时使用 + - fallbackFactory 能够获取到 OpenFeign 调用抛出的异常 ### 使用 Hystrix 监控面板监测客户端容错 - -### SpringCloud Netflix Hystrix 容错组件总结 \ No newline at end of file +- Hystrix Dashboard 是一个单独的应用,用来实时的对 Hystrix 的『使用情况』进行实时的监控 +### SpringCloud Netflix Hystrix 容错组件总结 +- ![Hystrix工作流程图.png](pic/Hystrix工作流程图.png) +--- +- Hystrix 完整的工作流程 + - 构造-个 HystrixCommand 或者 HystrixObservableCommand 对象 + - 表示对一个对依赖项请求的包装 + - 执行命令获取响应 + - execute():阻塞,直到收到响应或者抛出异常, 同步执行的, 调用的是 queue().get() + - queue():返回一个 Future, 异步执行的, 调用的是 observable.toBlocking().toFuture() + - observe():订阅代表响应的 Observable + - toObservable():返回一个 Observable,当你订阅它以后,将会执行Hystrix 命令并且推送它的响应 + - 总结: 最终每一个 HystrixCommand, 都是 Observable 的实现 + - 响应是否已经被缓存 + - 如果请求在缓存中的结果是存在的, 该请求在缓存中对应的结果是可用的, 缓存就会立即以一个 Observable 的形式被返回 + - 断路器是否打开 + - 缓存不存在的时候, 当你执行命令的时候, Hystrix 检查断路器是否打开, 如果打开, 不会执行任何命令, 执行回退逻辑, 如果是关闭, 检查容量是否可用 + - 线程池、信号量是否已经使用率 100% + - 判断, 线程池和信号量是否已经满了, 如果满了, 不会执行命令, 立马执行回退逻辑 + - HystrixObservableCommand.construct() or HystrixCommand.run() + - 对依赖项的调用请求, 真正的逻辑编写在这里, run 会返回一个响应, 或者抛出一个异常, construct 会返回一个 Observable 对象, 会推送一个响应或者一个 ERROR 通知, 会抛出一个 Timeout 异常 + - 计算电路健康 + - Hystrix 报告成功, 失败, 拒绝, 超时给断路器, 断路器会维护一组计算统计的计数器, 断路器会用这些统计数据来决定, 什么时候要跳闸, 后续的所有请求都会被短路, 直到一个恢复周期耗尽以后, 在第一次检查健康检查之后, 才会重新打开电路 + - 回退或返回成功响应 + - 一, 当你失败以后, HystrixCommand 会回退一个单一的回退值, HystrixObservableCommand 会返回一个或者多个回退值, 通过 fallback, 当 fallback 有异常也会被进一步抛出, 给调用者发送 ERROR 通知, 命令调用者进行处理 + - 二, 成功会直接抛出响应给调用者, 返回一个 Observable \ No newline at end of file diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-hystrix/pic/Hystrix工作流程图.png b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix/pic/Hystrix工作流程图.png new file mode 100644 index 0000000..639892f Binary files /dev/null and b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix/pic/Hystrix工作流程图.png differ diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-hystrix/pom.xml b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix/pom.xml index 3c5d3cb..deb2130 100644 --- a/dev-protocol-springcloud/dev-protocol-springcloud-hystrix/pom.xml +++ b/dev-protocol-springcloud/dev-protocol-springcloud-hystrix/pom.xml @@ -49,6 +49,27 @@ fastjson 1.2.51 + + + + io.github.openfeign + feign-micrometer + + + org.springframework.cloud + spring-cloud-starter-openfeign + + + + io.github.openfeign + feign-okhttp + + + + io.github.openfeign + feign-gson + 12.1 + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-test + test + + + + org.springframework.cloud + spring-cloud-context + 2.2.6.RELEASE + + + + org.springframework.kafka + spring-kafka + 2.5.0.RELEASE + + + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.1.0 + + + org.apache.commons + commons-lang3 + 3.11 + + + + mysql + mysql-connector-java + 8.0.19 + + + + org.springframework.boot + spring-boot-starter-data-jpa + + + + + ${artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + + + \ No newline at end of file diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/Application.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/Application.java new file mode 100644 index 0000000..5d8dbdc --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/Application.java @@ -0,0 +1,14 @@ +package org.example; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + *

工程启动入口

+ * */ +@SpringBootApplication +public class Application { + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } +} \ No newline at end of file diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/KafkaController.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/KafkaController.java new file mode 100644 index 0000000..43daacc --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/KafkaController.java @@ -0,0 +1,41 @@ +package org.example.controller; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.example.kafka.KafkaProducer; +import org.example.vo.QMessage; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + *

SpringBoot 集成 kafka 发送消息

+ * */ +@Slf4j +@RestController +@RequestMapping("/kafka") +public class KafkaController { + + private final ObjectMapper mapper; + private final KafkaProducer kafkaProducer; + + public KafkaController(ObjectMapper mapper, KafkaProducer kafkaProducer) { + this.mapper = mapper; + this.kafkaProducer = kafkaProducer; + } + + /** + *

发送 kafka 消息

+ * */ + @GetMapping("/send-message") + public void sendMessage(@RequestParam(required = false) String key, + @RequestParam String topic) throws Exception { + + QMessage message = new QMessage( + 1, + "q-Study-Message" + ); + kafkaProducer.sendMessage(key, mapper.writeValueAsString(message), topic); + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/RocketMQController.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/RocketMQController.java new file mode 100644 index 0000000..8d2bb65 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/RocketMQController.java @@ -0,0 +1,51 @@ +package org.example.controller; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.example.rocket.RocketMQProducer; +import org.example.vo.QMessage; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + *

SpringBoot 集成 RocketMQ

+ * */ +@Slf4j +@RestController +@RequestMapping("/rocket-mq") +public class RocketMQController { + + private static final QMessage RocketMQMessage = new QMessage( + 1, + "Q-Study-RocketMQ-In-SpringBoot" + ); + + private final RocketMQProducer rocketMQProducer; + + public RocketMQController(RocketMQProducer rocketMQProducer) { + this.rocketMQProducer = rocketMQProducer; + } + + @GetMapping("/message-with-value") + public void sendMessageWithValue() { + rocketMQProducer.sendMessageWithValue(JSON.toJSONString(RocketMQMessage)); + } + + @GetMapping("/message-with-key") + public void sendMessageWithKey() { + rocketMQProducer.sendMessageWithKey("qy", JSON.toJSONString(RocketMQMessage)); + } + + @GetMapping("/message-with-tag") + public void sendMessageWithTag() { + rocketMQProducer.sendMessageWithTag("qy", + JSON.toJSONString(RocketMQMessage)); + } + + @GetMapping("/message-with-all") + public void sendMessageWithAll() { + rocketMQProducer.sendMessageWithAll("q", "q", + JSON.toJSONString(RocketMQMessage)); + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConfig.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConfig.java new file mode 100644 index 0000000..2882d17 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConfig.java @@ -0,0 +1,81 @@ +package org.example.kafka; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.*; + +import java.util.HashMap; +import java.util.Map; + +/** + *

通过代码自定义 Kafka 配置

+ * + * 复杂不会怎么更改的配置信息最好以代码的方式进行配置 + * 一般一些常用的改动的配置, 直接放在配置文件中 + * */ +@Configuration +public class KafkaConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + /** + *

Kafka Producer 工厂类配置

+ * */ + @Bean + public ProducerFactory producerFactory() { + + Map configs = new HashMap<>(); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return new DefaultKafkaProducerFactory<>(configs); + } + + /** + *

Kafka Producer 客户端

+ * */ + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + /** + *

Kafka Consumer 工厂类配置

+ * */ + @Bean + public ConsumerFactory consumerFactory() { + + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + // 因为配置为拉取模式, 最多拉取 50条记录 + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + return new DefaultKafkaConsumerFactory<>(props); + } + + /** + *

Kafka Consumer 监听器工厂类配置

+ * */ + @Bean + public ConcurrentKafkaListenerContainerFactory + kafkaListenerContainerFactory() { + + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + // 并发数就是一个消费者实例起几个线程 + factory.setConcurrency(3); + factory.setConsumerFactory(consumerFactory()); + + return factory; + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConsumer.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConsumer.java new file mode 100644 index 0000000..1d911ae --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConsumer.java @@ -0,0 +1,56 @@ +package org.example.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.example.vo.QMessage; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import java.util.Optional; + +/** + *

Kafka 消费者

+ * */ +@Slf4j +@Component +public class KafkaConsumer { + + private final ObjectMapper mapper; + + public KafkaConsumer(ObjectMapper mapper) { + this.mapper = mapper; + } + + /** + *

监听 Kafka 消息并消费

+ * */ + @KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka") + public void listener01(ConsumerRecord record) throws Exception { + + String key = record.key(); + String value = record.value(); + + QMessage kafkaMessage = mapper.readValue(value, QMessage.class); + log.info("in listener01 consume kafka message: [{}], [{}]", + key, mapper.writeValueAsString(kafkaMessage)); + } + + /** + *

监听 Kafka 消息并消费

+ * 不知道发送的类是什么类型的时候发送的 + * */ + @KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka-1") + public void listener02(ConsumerRecord record) throws Exception { + + Optional _kafkaMessage = Optional.ofNullable(record.value()); + if (_kafkaMessage.isPresent()) { + Object message = _kafkaMessage.get(); + // 如果不能确定类型的时候, 下面的代码要进行包装 + QMessage kafkaMessage = mapper.readValue(message.toString(), + QMessage.class); + log.info("in listener02 consume kafka message: [{}]", + mapper.writeValueAsString(kafkaMessage)); + } + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaProducer.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaProducer.java new file mode 100644 index 0000000..b63987c --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaProducer.java @@ -0,0 +1,77 @@ +package org.example.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Component; +import org.springframework.util.concurrent.ListenableFuture; + +import java.util.concurrent.TimeUnit; + +/** + *

kafka 生产者

+ * */ +@Slf4j +@Component +public class KafkaProducer { + + private final KafkaTemplate kafkaTemplate; + + public KafkaProducer(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + /** + *

发送 kafka 消息

+ * */ + public void sendMessage(String key, String value, String topic) { + + if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) { + throw new IllegalArgumentException("value or topic is null or empty"); + } + + ListenableFuture> future = StringUtils.isBlank(key) ? + kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value); + + // 异步回调的方式获取通知 + future.addCallback( + success -> { + // 元数据信息不为空 + assert null != success && null != success.getRecordMetadata(); + // 发送到 kafka 的 topic + String _topic = success.getRecordMetadata().topic(); + // 消息发送到的分区 + int partition = success.getRecordMetadata().partition(); + // 消息在分区内的 offset + long offset = success.getRecordMetadata().offset(); + + log.info("send kafka message success: [{}], [{}], [{}]", + _topic, partition, offset); + }, failure -> { + log.error("send kafka message failure: [{}], [{}], [{}]", + key, value, topic); + } + ); + // future 支持多次获取消息, 不需要重新发送消息 + + // 同步等待的方式获取通知 + try { +// SendResult sendResult = future.get(); + SendResult sendResult = future.get(5, TimeUnit.SECONDS); + + // 发送到 kafka 的 topic + String _topic = sendResult.getRecordMetadata().topic(); + // 消息发送到的分区 + int partition = sendResult.getRecordMetadata().partition(); + // 消息在分区内的 offset + long offset = sendResult.getRecordMetadata().offset(); + + log.info("send kafka message success: [{}], [{}], [{}]", + _topic, partition, offset); + } catch (Exception ex) { + log.error("send kafka message failure: [{}], [{}], [{}]", + key, value, topic); + } + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerMessageExt.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerMessageExt.java new file mode 100644 index 0000000..69a832f --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerMessageExt.java @@ -0,0 +1,29 @@ +package org.example.rocket; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +/** + *

第三个 RocketMQ 消费者,

+ * */ +@Slf4j +@Component +@RocketMQMessageListener( + topic = "q-study-rocketmq", + consumerGroup = "q-springboot-rocketmq-message-ext" +) +public class RocketMQConsumerMessageExt implements RocketMQListener { + + @Override + public void onMessage(MessageExt message) { + + String value = new String(message.getBody()); + log.info("consume message in RocketMQConsumerMessageExt: [{}], [{}]", + message.getKeys(), value); // 能拿到消息中的 key + log.info("MessageExt: [{}]", JSON.toJSONString(message)); // 会慢一些 + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerObject.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerObject.java new file mode 100644 index 0000000..1a58319 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerObject.java @@ -0,0 +1,27 @@ +package org.example.rocket; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.example.vo.QMessage; +import org.springframework.stereotype.Component; + +/** + *

第四个, RocketMQ 消费者, 指定消费带有 tag 的消息, 且消费的是 Java Pojo

+ * */ +@Slf4j +@Component +@RocketMQMessageListener( + topic = "imooc-study-rocketmq", + consumerGroup = "q-springboot-rocketmq-tag-object", + selectorExpression = "q" // 根据 tag 做过滤 +) +public class RocketMQConsumerObject implements RocketMQListener { + @Override + public void onMessage(QMessage message) { + log.info("consume message in RocketMQConsumerObject: [{}]", + JSON.toJSONString(message)); + // so something + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerString.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerString.java new file mode 100644 index 0000000..5c274bd --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerString.java @@ -0,0 +1,28 @@ +package org.example.rocket; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.example.vo.QMessage; +import org.springframework.stereotype.Component; + +/** + *

第一个 RocketMQ 消费者

+ * */ +@Slf4j +@Component +@RocketMQMessageListener( + topic = "q-study-rocketmq", + consumerGroup = "q-springboot-rocketmq-string" +) +public class RocketMQConsumerString implements RocketMQListener { + + @Override + public void onMessage(String message) { + + QMessage rocketMessage = JSON.parseObject(message, QMessage.class); + log.info("consume message in RocketMQConsumerString: [{}]", + JSON.toJSONString(rocketMessage)); + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerTagString.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerTagString.java new file mode 100644 index 0000000..c5a8f7d --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQConsumerTagString.java @@ -0,0 +1,28 @@ +package org.example.rocket; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.example.vo.QMessage; +import org.springframework.stereotype.Component; + +/** + *

第二个 RocketMQ 消费者, 指定了消费带有 tag 的消息

+ * */ +@Slf4j +@Component +@RocketMQMessageListener( + topic = "q-study-rocketmq", + consumerGroup = "q-springboot-rocketmq-tag-string", + selectorExpression = "qy" // 根据 tag 过滤, tag 中要带有 qy +) +public class RocketMQConsumerTagString implements RocketMQListener { + @Override + public void onMessage(String message) { + + QMessage rocketMessage = JSON.parseObject(message, QMessage.class); + log.info("consume message in RocketMQConsumerTagString: [{}]", + JSON.toJSONString(rocketMessage)); + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQProducer.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQProducer.java new file mode 100644 index 0000000..13711ff --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/rocket/RocketMQProducer.java @@ -0,0 +1,100 @@ +package org.example.rocket; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.example.vo.QMessage; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +/** + *

通过 RocketMQ 发送消息

+ * Spring Messaging 模块 + * */ +@Slf4j +@Component +public class RocketMQProducer { + + /** 类似 Kafka 中的 topic, 默认的读写队列都是4个, 默认自动创建topic */ + private static final String TOPIC = "q-study-rocketmq"; + + /** RocketMQ 客户端 */ + private final RocketMQTemplate rocketMQTemplate; + + public RocketMQProducer(RocketMQTemplate rocketMQTemplate) { + this.rocketMQTemplate = rocketMQTemplate; + } + + /** + *

使用同步的方式发送消息, 不指定 key 和 tag

+ * */ + public void sendMessageWithValue(String value) { + + // 随机选择一个 Topic 的 Message Queue 发送消息 + SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, value); + log.info("sendMessageWithValue result: [{}]", JSON.toJSONString(sendResult)); + + SendResult sendResultOrderly = rocketMQTemplate.syncSendOrderly( + TOPIC, value, "QQQ" + ); + log.info("sendMessageWithValue orderly result: [{}]", + JSON.toJSONString(sendResultOrderly)); + } + + /** + *

使用异步的方式发送消息, 指定 key

+ * */ + public void sendMessageWithKey(String key, String value) { + + Message message = MessageBuilder.withPayload(value) + // 这个Key 不是分区的效果, 只是方便进行查询, 在设置的时候, 可以使用空格进行分开, 例如: aaaa bbb + .setHeader(RocketMQHeaders.KEYS, key).build(); + + // 异步发送消息, 并设定回调 + rocketMQTemplate.asyncSend(TOPIC, message, new SendCallback() { + + @Override + public void onSuccess(SendResult sendResult) { + log.info("sendMessageWithKey success result: [{}]", + JSON.toJSONString(sendResult)); + } + + @Override + public void onException(Throwable e) { + log.error("sendMessageWithKey failure: [{}]", e.getMessage(), e); + } + }); + } + + /** + *

使用同步的方式发送消息, 带有 tag, 且发送的是 Java Pojo

+ * 发送消息可以定义自己的消息数据结构 + * */ + public void sendMessageWithTag(String tag, String value) { + + QMessage qMessage = JSON.parseObject(value, QMessage.class); + SendResult sendResult = rocketMQTemplate.syncSend( + String.format("%s:%s", TOPIC, tag), // 不同的消费者组使用不同的 tag + qMessage + ); + log.info("sendMessageWithTag result: [{}]", JSON.toJSONString(sendResult)); + } + + /** + *

使用同步的方式发送消息, 带有 key 和 tag

+ * */ + public void sendMessageWithAll(String key, String tag, String value) { + + Message message = MessageBuilder.withPayload(value) + .setHeader(RocketMQHeaders.KEYS, key).build(); + SendResult sendResult = rocketMQTemplate.syncSend( + String.format("%s:%s", TOPIC, tag), + message + ); + log.info("sendMessageWithAll result: [{}]", JSON.toJSONString(sendResult)); + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/vo/QMessage.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/vo/QMessage.java new file mode 100644 index 0000000..7d4c20f --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/vo/QMessage.java @@ -0,0 +1,27 @@ +package org.example.vo; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + *

通过 Kafka 传递的消息对象

+ * */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class QMessage { + + /** + * 唯一的标识一个消息对象 + */ + private Integer id; + + /** + * 项目名称 + */ + private String projectName; + + // todo 自己进行扩展 + +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/application-dev.yml b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/application-dev.yml new file mode 100644 index 0000000..9695606 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/application-dev.yml @@ -0,0 +1,49 @@ +server: + port: 8001 + servlet: + context-path: /dev-protocol-springcloud-message-study + +spring: + # SpringBoot 集成 Kafka 的配置, 最低配置只需要配置 spring.kafka.bootstrap-servers + kafka: + bootstrap-servers: 127.0.0.1:9092 + jpa: + show-sql: true + hibernate: + ddl-auto: none + properties: + hibernate.show_sql: true + hibernate.format_sql: true + open-in-view: false + datasource: + # 数据源 + url: jdbc:mysql://127.0.0.1:3306/dev_protocol_springcloud_project?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8 + username: root + password: root + type: com.zaxxer.hikari.HikariDataSource + driver-class-name: com.mysql.cj.jdbc.Driver + # 连接池 + hikari: + maximum-pool-size: 8 + minimum-idle: 4 + idle-timeout: 30000 + connection-timeout: 30000 + max-lifetime: 45000 + auto-commit: true + pool-name: devProtocolSpringcloudHikariCP +# consumer: + # 如果 Consumer 没有指定 group-id, 则使用配置文件中配置的; 如果配置文件中也没有定义, 则由框架随机生成 +# group-id: imooc-study-ecommerce +# auto-offset-reset: latest +# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer +# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer +# producer: +# key-serializer: org.apache.kafka.common.serialization.StringSerializer +# value-serializer: org.apache.kafka.common.serialization.StringSerializer + +# RocketMQ 的配置, 这是最低配置 +rocketmq: + name-server: 127.0.0.1:9876 + producer: + # 发送同一类消息的设置为同一个 group, 保证唯一 + group: dev-protocol-springcloud-message-study diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/bootstrap.yml b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/bootstrap.yml new file mode 100644 index 0000000..2366e3d --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/bootstrap.yml @@ -0,0 +1,16 @@ +spring: + profiles: + # prod, dev + active: dev + application: + name: dev-protocol-springcloud-message-study + +# 暴露端点 +management: + endpoints: + web: + exposure: + include: '*' + endpoint: + health: + show-details: always diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/kafka-controller.http b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/kafka-controller.http new file mode 100644 index 0000000..6d183bd --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/kafka-controller.http @@ -0,0 +1,7 @@ +### kafka-send-message +GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/kafka/send-message?key=qqq&topic=q-springboot +Content-Type: application/json + +### kafka-send-message - (测试kafka 支持无key消息, key 是用来分区的) +GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/kafka/send-message?topic=q-springboot +Content-Type: application/json diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/rocket-mq-controller.http b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/rocket-mq-controller.http new file mode 100644 index 0000000..dc3cd2d --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/rocket-mq-controller.http @@ -0,0 +1,15 @@ +### message-with-value +GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-value +Content-Type: application/json + +### message-with-key +GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-key +Content-Type: application/json + +### message-with-tag +GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-tag +Content-Type: application/json + +### message-with-all +GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-all +Content-Type: application/json diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/transactional-lose.http b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/transactional-lose.http new file mode 100644 index 0000000..de7d991 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/transactional-lose.http @@ -0,0 +1,7 @@ +### wrong-rollback-for +GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/transactional-lose/wrong-rollback-for +Content-Type: application/json + +### wrong-inner-call +GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/transactional-lose/wrong-inner-call +Content-Type: application/json diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/bootstrap.yml b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/bootstrap.yml index 775c208..500ebe9 100644 --- a/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/bootstrap.yml +++ b/dev-protocol-springcloud/dev-protocol-springcloud-stream/src/main/resources/bootstrap.yml @@ -12,7 +12,7 @@ spring: discovery: enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可 server-addr: 127.0.0.1:8848 - namespace: 1bc13fd5-843b-4ac0-aa55-695c25bc0ac6 + namespace: 1ccc74ae-9398-4dbe-b9d7-4f9addf9f40c metadata: management: context-path: ${server.servlet.context-path}/actuator @@ -27,15 +27,15 @@ spring: # rocketmq: # binder: # name-server: 127.0.0.1:9876 - # fixme 开启 stream 分区支持 + # 开启 stream 分区支持 instanceCount: 1 # 消费者的总数 - instanceIndex: 0 # 当前消费者的索引 + instanceIndex: 0 # 当前消费者的索引, 多个的时候, 另外一个要配置成 1, 依次类推 bindings: # 默认发送方 output: # 这里用 Stream 给我们提供的默认 output 信道 destination: dev-protocol-springcloud-stream-client-default # 消息发往的目的地, Kafka 中就是 Topic content-type: text/plain # 消息发送的格式, 接收端不用指定格式, 但是发送端要 - # fixme 消息分区 + # 消息分区 - 只对默认接收方和发送方进行配置, 下面自定义的原理相同 producer: # fixme partitionKeyExpression 这种方式需要我们需要对 author 字段进行各种处理, 这里不使用这个方式, 使用下面自定义的配置策略 # partitionKeyExpression: payload.author # 分区关键字, payload 指的是发送的对象, author 是对象中的属性, 不存在 author 可能会报错 @@ -46,9 +46,9 @@ spring: # 默认接收方 input: # 这里用 Stream 给我们提供的默认 input 信道 destination: dev-protocol-springcloud-stream-client-default - # fixme 这里指定了group 方便其进行 分组消费 + # 这里指定了group 方便其进行 分组消费 group: dev-protocol-springcloud-stream-client-group-default - # fixme 消费者开启 分区支持 + # 消费者开启 分区支持 consumer: partitioned: true # q 发送方 destination 对应 diff --git a/pom.xml b/pom.xml index 3d14b7d..fb471b6 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,8 @@ dev-protocol-springcloud/dev-protocol-springcloud-project-service-sdk dev-protocol-springcloud/dev-protocol-springcloud-project-goods-service dev-protocol-springcloud/dev-protocol-springcloud-hystrix + dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard + dev-protocol-springcloud/dev-protocol-springcloud-message-study